This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eee5e6290d1 [bugfix](be) exchange sink buffer may use wrong runtime state (#50588) eee5e6290d1 is described below commit eee5e6290d14e7e998c0b8eced9861956d59f0a7 Author: yiguolei <guo...@selectdb.com> AuthorDate: Tue May 6 14:47:18 2025 +0800 [bugfix](be) exchange sink buffer may use wrong runtime state (#50588) ### What problem does this PR solve? 1. exchange sink buffer maybe shared among different pipeline. In this case, it should use pipeline fragment ctx's runtime state. 2. In other case, it should use local runtime state. --- be/src/pipeline/exec/exchange_sink_buffer.h | 1 + be/src/pipeline/exec/exchange_sink_operator.cpp | 14 +++++++------- be/src/pipeline/exec/exchange_sink_operator.h | 11 +++++++---- be/src/pipeline/pipeline_fragment_context.h | 2 -- be/test/pipeline/operator/exchange_sink_operator_test.cpp | 2 +- 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 44416ef68e1..5c650cc5132 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -310,6 +310,7 @@ private: PlanNodeId _node_id; std::atomic<int64_t> _rpc_count = 0; + // The state may be from PipelineFragmentContext if it is shared among multi instances. RuntimeState* _state = nullptr; QueryContext* _context = nullptr; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e944cb78f67..ebb8aad7ad5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -106,7 +106,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _rpc_channels_num = channels.size() - local_size; if (!_only_local_exchange) { - _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo); + _sink_buffer = p.get_sink_buffer(state, state->fragment_instance_id().lo); register_channels(_sink_buffer.get()); _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "ExchangeSinkQueueDependency", true); @@ -349,7 +349,7 @@ void ExchangeSinkOperatorX::_init_sink_buffer() { for (auto fragment_instance_id : _fragment_instance_ids) { ins_ids.push_back(fragment_instance_id.lo); } - _sink_buffer = _create_buffer(ins_ids); + _sink_buffer = _create_buffer(_state, ins_ids); } template <typename ChannelPtrType> @@ -572,11 +572,11 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer( - const std::vector<InstanceLoId>& sender_ins_ids) { + RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids) { PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, _node_id, state(), + auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, _node_id, state, sender_ins_ids); for (const auto& _dest : _dests) { sink_buffer->construct_request(_dest.fragment_instance_id); @@ -590,17 +590,17 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer( // (Note: This does not reduce the total number of RPCs.) // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer( - InstanceLoId sender_ins_id) { + RuntimeState* state, InstanceLoId sender_ins_id) { // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX, // it is an order-by scenario. // In this case, there is only one target instance, and no n * n RPC concurrency will occur. // Therefore, sharing a sink buffer is not necessary. if (_dest_is_merge) { - return _create_buffer({sender_ins_id}); + return _create_buffer(state, {sender_ins_id}); } if (_state->enable_shared_exchange_sink_buffer()) { return _sink_buffer; } - return _create_buffer({sender_ins_id}); + return _create_buffer(state, {sender_ins_id}); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index cdef5e5e119..b6a22e1a8aa 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -189,8 +189,10 @@ public: const std::vector<TUniqueId>& fragment_instance_ids); Status init(const TDataSink& tsink) override; - RuntimeState* state() { return _state; } - + // The state is from pipeline fragment context, it will be saved in ExchangeSinkOperator + // and it will be passed to exchange sink buffer. So that exchange sink buffer should not + // be used after pipeline fragment ctx. All operations in Exchange Sink Buffer should hold + // TaskExecutionContext. Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -217,7 +219,8 @@ public: // Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs. // (Note: This does not reduce the total number of RPCs.) // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. - std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id); + std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(RuntimeState* state, + InstanceLoId sender_ins_id); vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; } private: @@ -232,7 +235,7 @@ private: // The sink buffer can be shared among multiple ExchangeSinkLocalState instances, // or each ExchangeSinkLocalState can have its own sink buffer. std::shared_ptr<ExchangeSinkBuffer> _create_buffer( - const std::vector<InstanceLoId>& sender_ins_ids); + RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids); std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr; RuntimeState* _state = nullptr; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8750ce470d9..8bcbffa6ff9 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -82,8 +82,6 @@ public: PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1); - RuntimeState* get_runtime_state() { return _runtime_state.get(); } - QueryContext* get_query_ctx() { return _query_ctx.get(); } [[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); } diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp b/be/test/pipeline/operator/exchange_sink_operator_test.cpp index aa3fc6f7877..27fa6523e98 100644 --- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp +++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp @@ -61,7 +61,7 @@ struct MockExchangeSinkOperatorX : public ExchangeSinkOperatorX { void _init_sink_buffer() override { std::vector<InstanceLoId> ins_ids {fragment_instance_id.lo}; - _sink_buffer = _create_buffer(ins_ids); + _sink_buffer = _create_buffer(_state, ins_ids); } }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org