This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ad8e49e07e5 [fix](pipeline)Fix set_operation not correctly setting
shuffled_operator (#59293)
ad8e49e07e5 is described below
commit ad8e49e07e5d039a6e70e01ccd81ba29f55042a6
Author: Mryange <[email protected]>
AuthorDate: Sun Dec 28 22:09:01 2025 +0800
[fix](pipeline)Fix set_operation not correctly setting shuffled_operator
(#59293)
### What problem does this PR solve?
The sink operator for the set did not call
set_followed_by_shuffled_operator.
---
be/src/pipeline/exec/set_probe_sink_operator.h | 2 ++
be/src/pipeline/exec/set_sink_operator.h | 2 ++
be/src/pipeline/pipeline_fragment_context.cpp | 12 +++++++-----
be/src/pipeline/pipeline_fragment_context.h | 3 ++-
4 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 3730d46496d..1e4ca305805 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -113,6 +113,8 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+ bool is_shuffled_operator() const override { return true; }
+
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>&
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 45934bd4675..19c515aa813 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -118,6 +118,8 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+ bool is_shuffled_operator() const override { return true; }
+
private:
template <class HashTableContext, bool is_intersected>
friend struct HashTableBuild;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 77cd468efe2..59d557fd0d8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1594,14 +1594,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
- op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+ followed_by_shuffled_operator));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
- op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+ followed_by_shuffled_operator));
break;
}
case TPlanNodeType::REPEAT_NODE: {
@@ -1662,8 +1662,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+ PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool
followed_by_shuffled_operator) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
+ op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1683,6 +1684,7 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(
sink.reset(new SetProbeSinkOperatorX<is_intersect>(
child_id, next_sink_operator_id(), op->operator_id(),
pool, tnode, descs));
}
+ sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode,
_runtime_state.get()));
// prepare children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..3578572ae52 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -140,7 +140,8 @@ private:
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx);
+ int child_idx,
+ bool
followed_by_shuffled_operator);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]