This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b104e933cd4 [Bug](expr) execute expr should use local states instead 
of operators (#40189) (#40219)
b104e933cd4 is described below

commit b104e933cd46a742f7ccc380c1470735c7e5f2ed
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Sun Sep 1 00:41:10 2024 +0800

    [Bug](expr) execute expr should use local states instead of operators 
(#40189) (#40219)
    
    ## Proposed changes
    
    cherry-pick from master #40189
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/aggregation_source_operator.cpp        |  3 ++-
 be/src/pipeline/exec/assert_num_rows_operator.cpp           |  3 ++-
 be/src/pipeline/exec/assert_num_rows_operator.h             |  3 +++
 .../exec/distinct_streaming_aggregation_operator.cpp        |  4 ++--
 be/src/pipeline/exec/multi_cast_data_stream_source.h        |  7 ++++---
 be/src/pipeline/exec/repeat_operator.cpp                    |  2 +-
 be/src/pipeline/exec/streaming_aggregation_operator.cpp     |  4 ++--
 regression-test/data/javaudf_p0/test_javaudf_string.out     |  3 +++
 .../suites/javaudf_p0/test_javaudf_string.groovy            | 13 +++++++++++++
 9 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index fadddee9034..8f19c24589b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -435,7 +435,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* blo
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
     local_state.make_nullable_output_key(block);
     // dispose the having clause, should not be execute in prestreaming agg
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
     local_state.reached_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index ef0efd3f86b..8f86e3ecb87 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -120,7 +120,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     }
     COUNTER_SET(local_state.rows_returned_counter(), 
local_state.num_rows_returned());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h 
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 4d6d835f815..3874c1fc771 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -46,6 +46,9 @@ public:
     AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<FakeSharedState>(state, parent) {}
     ~AssertNumRowsLocalState() = default;
+
+private:
+    friend class AssertNumRowsOperatorX;
 };
 
 class AssertNumRowsOperatorX final : public 
StreamingOperatorX<AssertNumRowsLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 16c0df07b49..1635d927b6d 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -515,8 +515,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* 
state, vectorized::Bloc
     local_state._make_nullable_output_key(block);
     if (!_is_streaming_preagg) {
         // dispose the having clause, should not be execute in prestreaming agg
-        RETURN_IF_ERROR(
-                vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                               
block->columns()));
     }
     local_state.add_num_rows_returned(block->rows());
     COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 761e899c3d1..aab5cb96dbe 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -117,6 +117,7 @@ public:
     }
 
 private:
+    friend class MultiCastDataStreamerSourceOperatorX;
     vectorized::VExprContextSPtrs _output_expr_contexts;
     std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
 
@@ -151,8 +152,8 @@ public:
 
         if (_t_data_stream_sink.__isset.conjuncts) {
             
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts,
-                                                                 _conjuncts));
-            RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, 
_row_desc()));
+                                                                 conjuncts()));
+            RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, 
_row_desc()));
         }
         return Status::OK();
     }
@@ -163,7 +164,7 @@ public:
             RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, 
state));
         }
         if (_t_data_stream_sink.__isset.conjuncts) {
-            RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
+            RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state));
         }
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 65eccc3fd4e..991754ba122 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -247,7 +247,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
         }
         
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
     }
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
                                                            
output_block->columns()));
     *eos = _child_eos && _child_block.rows() == 0;
     local_state.reached_limit(output_block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index d7589f59f9f..cfb63aae9a5 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1299,8 +1299,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, 
vectorized::Block* block
         RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, 
block, eos));
         local_state.make_nullable_output_key(block);
         // dispose the having clause, should not be execute in prestreaming agg
-        RETURN_IF_ERROR(
-                vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                               
block->columns()));
     }
     local_state.reached_limit(block, eos);
 
diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out 
b/regression-test/data/javaudf_p0/test_javaudf_string.out
index 59f2f7c776d..b42a368b028 100644
--- a/regression-test/data/javaudf_p0/test_javaudf_string.out
+++ b/regression-test/data/javaudf_p0/test_javaudf_string.out
@@ -65,3 +65,6 @@ ab***fg7      ab***fg7
 ab***fg8       ab***fg8
 ab***fg9       ab***fg9
 
+-- !select_5 --
+0
+
diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy 
b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
index 6517c4b08c2..e6484a1fde1 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
@@ -86,9 +86,22 @@ suite("test_javaudf_string") {
                 test_javaudf_string
                 JOIN test_javaudf_string_2 ON test_javaudf_string.user_id = 
test_javaudf_string_2.user_id order by 1,2;
         """
+        sql """DROP TABLE IF EXISTS tbl1"""
+        sql """create table tbl1(k1 int, k2 string) distributed by hash(k1) 
buckets 1 properties("replication_num" = "1");"""
+        sql """ insert into tbl1 values(1, "5");"""
+        Integer count = 0;
+        Integer maxCount = 20;
+        while (count < maxCount) {
+            sql """  insert into tbl1 select * from tbl1;"""
+            count++
+            sleep(100);
+        }
+        sql """  insert into tbl1 select random()%10000 * 10000, "5" from 
tbl1;"""
+        qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from 
tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """;
     } finally {
         try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, 
int);")
         try_sql("DROP TABLE IF EXISTS ${tableName}")
+        try_sql("DROP TABLE IF EXISTS tbl1")
         try_sql("DROP TABLE IF EXISTS test_javaudf_string_2")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to