This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5734bbcb287 [cherry-pick](branch-30) execute expr should use local 
states instead of operators(#40189) (#41324)
5734bbcb287 is described below

commit 5734bbcb287a4d16cd0a7db545c406d966c714fb
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Fri Sep 27 19:36:45 2024 +0800

    [cherry-pick](branch-30) execute expr should use local states instead of 
operators(#40189) (#41324)
    
    The expr of operator cannot be executed concurrently, should use local
    state's expr.
    cherry-pick from master https://github.com/apache/doris/pull/40189
---
 be/src/pipeline/exec/aggregation_source_operator.cpp        |  3 ++-
 be/src/pipeline/exec/assert_num_rows_operator.cpp           |  3 ++-
 be/src/pipeline/exec/assert_num_rows_operator.h             |  3 +++
 .../exec/distinct_streaming_aggregation_operator.cpp        |  4 ++--
 be/src/pipeline/exec/multi_cast_data_stream_source.h        |  7 ++++---
 be/src/pipeline/exec/operator.h                             | 12 +++++++-----
 be/src/pipeline/exec/repeat_operator.cpp                    |  2 +-
 be/src/pipeline/exec/streaming_aggregation_operator.cpp     |  4 ++--
 regression-test/data/javaudf_p0/test_javaudf_string.out     |  3 +++
 .../suites/javaudf_p0/test_javaudf_string.groovy            | 13 +++++++++++++
 10 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 3264ad56f3c..a5f40a431c5 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -443,7 +443,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* blo
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
     local_state.make_nullable_output_key(block);
     // dispose the having clause, should not be execute in prestreaming agg
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
     local_state.do_agg_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 4a51002beff..5aa27b51c45 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -116,7 +116,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     }
     COUNTER_SET(local_state.rows_returned_counter(), 
local_state.num_rows_returned());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h 
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 423bd69144e..dcc64f57878 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -28,6 +28,9 @@ public:
     AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<FakeSharedState>(state, parent) {}
     ~AssertNumRowsLocalState() = default;
+
+private:
+    friend class AssertNumRowsOperatorX;
 };
 
 class AssertNumRowsOperatorX final : public 
StreamingOperatorX<AssertNumRowsLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index e8efb51973e..ab71b52ae01 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -462,8 +462,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, vectorized::Bloc
     local_state._make_nullable_output_key(block);
     if (!_is_streaming_preagg) {
         // dispose the having clause, should not be execute in prestreaming agg
-        RETURN_IF_ERROR(
-                vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                               
block->columns()));
     }
     local_state.add_num_rows_returned(block->rows());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 8ecbd23764d..c71310e3ee3 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -62,6 +62,7 @@ public:
     }
 
 private:
+    friend class MultiCastDataStreamerSourceOperatorX;
     vectorized::VExprContextSPtrs _output_expr_contexts;
     std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
 
@@ -95,8 +96,8 @@ public:
 
         if (_t_data_stream_sink.__isset.conjuncts) {
             
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts,
-                                                                 _conjuncts));
-            RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, 
_row_desc()));
+                                                                 conjuncts()));
+            RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, 
_row_desc()));
         }
         return Status::OK();
     }
@@ -107,7 +108,7 @@ public:
             RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, 
state));
         }
         if (_t_data_stream_sink.__isset.conjuncts) {
-            RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
+            RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state));
         }
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index abed7fb446a..bde2291ec3a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -760,16 +760,18 @@ protected:
     ObjectPool* _pool = nullptr;
     std::vector<TupleId> _tuple_ids;
 
+private:
+    // The expr of operator set to private permissions, as cannot be executed 
concurrently,
+    // should use local state's expr.
     vectorized::VExprContextSPtrs _conjuncts;
+    vectorized::VExprContextSPtrs _projections;
+    // Used in common subexpression elimination to compute intermediate 
results.
+    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
 
+protected:
     RowDescriptor _row_descriptor;
-
     std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
-    vectorized::VExprContextSPtrs _projections;
-
     std::vector<RowDescriptor> _intermediate_output_row_descriptor;
-    // Used in common subexpression elimination to compute intermediate 
results.
-    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
 
     /// Resource information sent from the frontend.
     const TBackendResourceProfile _resource_profile;
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 48cc427d85b..fe45e85c52d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -234,7 +234,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
         }
         
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
     }
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
                                                            
output_block->columns()));
     *eos = _child_eos && _child_block.rows() == 0;
     local_state.reached_limit(output_block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 689a361c371..8aa1eb8df97 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1285,8 +1285,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, 
vectorized::Block* block
         RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, 
block, eos));
         local_state.make_nullable_output_key(block);
         // dispose the having clause, should not be execute in prestreaming agg
-        RETURN_IF_ERROR(
-                vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                               
block->columns()));
     }
     local_state.reached_limit(block, eos);
 
diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out 
b/regression-test/data/javaudf_p0/test_javaudf_string.out
index 59f2f7c776d..b42a368b028 100644
--- a/regression-test/data/javaudf_p0/test_javaudf_string.out
+++ b/regression-test/data/javaudf_p0/test_javaudf_string.out
@@ -65,3 +65,6 @@ ab***fg7      ab***fg7
 ab***fg8       ab***fg8
 ab***fg9       ab***fg9
 
+-- !select_5 --
+0
+
diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy 
b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
index 6517c4b08c2..e6484a1fde1 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
@@ -86,9 +86,22 @@ suite("test_javaudf_string") {
                 test_javaudf_string
                 JOIN test_javaudf_string_2 ON test_javaudf_string.user_id = 
test_javaudf_string_2.user_id order by 1,2;
         """
+        sql """DROP TABLE IF EXISTS tbl1"""
+        sql """create table tbl1(k1 int, k2 string) distributed by hash(k1) 
buckets 1 properties("replication_num" = "1");"""
+        sql """ insert into tbl1 values(1, "5");"""
+        Integer count = 0;
+        Integer maxCount = 20;
+        while (count < maxCount) {
+            sql """  insert into tbl1 select * from tbl1;"""
+            count++
+            sleep(100);
+        }
+        sql """  insert into tbl1 select random()%10000 * 10000, "5" from 
tbl1;"""
+        qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from 
tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """;
     } finally {
         try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, 
int);")
         try_sql("DROP TABLE IF EXISTS ${tableName}")
+        try_sql("DROP TABLE IF EXISTS tbl1")
         try_sql("DROP TABLE IF EXISTS test_javaudf_string_2")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to