This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c065364a905 [local exchange](fix) Fix correctness caused by local exchange (#41555) c065364a905 is described below commit c065364a9052f1371a51f5610725ad2598b11c75 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Oct 9 11:21:27 2024 +0800 [local exchange](fix) Fix correctness caused by local exchange (#41555) For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR. --- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../exec/distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 15 ++++--- .../exec/partitioned_hash_join_probe_operator.h | 2 +- .../exec/partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.h | 6 +++ be/src/pipeline/exec/union_source_operator.h | 5 +++ be/src/pipeline/pipeline_fragment_context.cpp | 46 ++++++++++++---------- be/src/pipeline/pipeline_task.h | 2 +- 13 files changed, 52 insertions(+), 38 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 97440de3f09..7c146c38a2b 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -148,7 +148,7 @@ public: ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index cf2892eb7e6..47a6ba234b5 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -81,7 +81,7 @@ public: if (_partition_by_eq_expr_ctxs.empty()) { return {ExchangeType::PASSTHROUGH}; } else if (_order_by_eq_expr_ctxs.empty()) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index e5476e89421..edeb4321763 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -105,7 +105,7 @@ public: DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index a544cdcf456..92b4ed18441 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -144,7 +144,7 @@ public: bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index d3bca8fa7cd..dde9c00dfe4 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -155,7 +155,7 @@ public: bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index eca7d608437..5cb83528e04 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -108,17 +108,20 @@ public: virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } [[nodiscard]] virtual bool require_data_distribution() const { return false; } OperatorPtr child() { return _child; } - [[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; } - void set_followed_by_shuffled_join(bool followed_by_shuffled_join) { - _followed_by_shuffled_join = followed_by_shuffled_join; + [[nodiscard]] bool followed_by_shuffled_operator() const { + return _followed_by_shuffled_operator; } + void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) { + _followed_by_shuffled_operator = followed_by_shuffled_operator; + } + [[nodiscard]] virtual bool is_shuffled_operator() const { return false; } [[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; } protected: OperatorPtr _child = nullptr; bool _is_closed; - bool _followed_by_shuffled_join = false; + bool _followed_by_shuffled_operator = false; }; class PipelineXLocalStateBase { @@ -477,8 +480,6 @@ public: [[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } @@ -663,8 +664,6 @@ public: [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override; [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 8cccc9f8fae..3aab11f62d8 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -168,7 +168,7 @@ public: bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 1376964663f..c768d7518b9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -118,7 +118,7 @@ public: bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 0bd6dd90964..8462472dd02 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -63,7 +63,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 13dfb0ba637..f939950143a 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -89,6 +89,12 @@ public: } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index bf32e9a25c2..2d112ebf2df 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -95,6 +95,11 @@ public: return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 32492c317ad..2d283652945 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, auto require_shuffled_data_distribution = cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = operators.size() > idx - ? operators[idx]->followed_by_shuffled_join() - : cur_pipe->sink()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() + : cur_pipe->sink()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1321,7 +1321,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(false); + op->set_followed_by_shuffled_operator(false); _require_bucket_distribution = true; RETURN_IF_ERROR(new_pipe->add_operator(op)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); @@ -1329,7 +1329,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorPtr sink; sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); + sink->set_followed_by_shuffled_operator(_require_bucket_distribution); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); @@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 36362e15813..e216ef10376 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -136,10 +136,10 @@ public: bool is_finalized() const { return _finalized; } void clear_blocking_state(bool wake_up_by_downstream = false) { - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock<std::mutex> lc(_dependency_lock); + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org