This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new be164cf6155 branch-3.1: [fix](union) Local shuffle for union operator 
#56048 #56449 (#56556)
be164cf6155 is described below

commit be164cf61557cb03de4f22e58e0c0536030c20d6
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 29 18:05:12 2025 +0800

    branch-3.1: [fix](union) Local shuffle for union operator #56048 #56449 
(#56556)
    
    picked from #56048 #56449
---
 be/src/pipeline/exec/union_sink_operator.cpp |  7 ++++++-
 be/src/pipeline/exec/union_sink_operator.h   | 13 ++++++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 8467eeb1d54..cbf5199aac4 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -57,7 +57,12 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int 
sink_id, ObjectPool* po
           
_first_materialized_child_idx(tnode.union_node.first_materialized_child_idx),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
           _cur_child_id(child_id),
-          _child_size(tnode.num_children) {}
+          _child_size(tnode.num_children),
+          _distribute_exprs(tnode.__isset.distribute_expr_lists
+                                    ? tnode.distribute_expr_lists[child_id]
+                                    : std::vector<TExpr> {}) {
+    DCHECK(!tnode.__isset.distribute_expr_lists || 
tnode.distribute_expr_lists.size() > child_id);
+}
 
 Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index aa94ed9a730..46459a02f47 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -94,6 +94,16 @@ public:
         return _followed_by_shuffled_operator;
     }
 
+    DataDistribution required_data_distribution() const override {
+        if (_child->is_serial_operator() && _followed_by_shuffled_operator) {
+            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+        }
+        if (_child->is_serial_operator()) {
+            return DataDistribution(ExchangeType::PASSTHROUGH);
+        }
+        return DataDistribution(ExchangeType::NOOP);
+    }
+
     bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
 
 private:
@@ -113,6 +123,7 @@ private:
     const RowDescriptor _row_descriptor;
     const int _cur_child_id;
     const int _child_size;
+    const std::vector<TExpr> _distribute_exprs;
     int children_count() const { return _child_size; }
     bool is_child_passthrough(int child_idx) const {
         DCHECK_LT(child_idx, _child_size);
@@ -152,4 +163,4 @@ private:
 };
 
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to