This is an automated email from the ASF dual-hosted git repository.

zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad8e49e07e5 [fix](pipeline)Fix set_operation not correctly setting 
shuffled_operator (#59293)
ad8e49e07e5 is described below

commit ad8e49e07e5d039a6e70e01ccd81ba29f55042a6
Author: Mryange <[email protected]>
AuthorDate: Sun Dec 28 22:09:01 2025 +0800

    [fix](pipeline)Fix set_operation not correctly setting shuffled_operator 
(#59293)
    
    ### What problem does this PR solve?
    
    The sink operator for the set did not call
    set_followed_by_shuffled_operator.
---
 be/src/pipeline/exec/set_probe_sink_operator.h |  2 ++
 be/src/pipeline/exec/set_sink_operator.h       |  2 ++
 be/src/pipeline/pipeline_fragment_context.cpp  | 12 +++++++-----
 be/src/pipeline/pipeline_fragment_context.h    |  3 ++-
 4 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 3730d46496d..1e4ca305805 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -113,6 +113,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
+    bool is_shuffled_operator() const override { return true; }
+
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
     Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& 
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 45934bd4675..19c515aa813 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -118,6 +118,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
+    bool is_shuffled_operator() const override { return true; }
+
 private:
     template <class HashTableContext, bool is_intersected>
     friend struct HashTableBuild;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 77cd468efe2..59d557fd0d8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1594,14 +1594,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
-        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+                followed_by_shuffled_operator));
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
-        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+                followed_by_shuffled_operator));
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
@@ -1662,8 +1662,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+        PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool 
followed_by_shuffled_operator) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
+    op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
     RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
 
     const auto downstream_pipeline_id = cur_pipe->id();
@@ -1683,6 +1684,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
             sink.reset(new SetProbeSinkOperatorX<is_intersect>(
                     child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         }
+        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
         // prepare children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..3578572ae52 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -140,7 +140,8 @@ private:
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
                                                    PipelinePtr& cur_pipe, int 
parent_idx,
-                                                   int child_idx);
+                                                   int child_idx,
+                                                   bool 
followed_by_shuffled_operator);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
                              const std::vector<TExpr>& output_exprs,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to