This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eee5e6290d1 [bugfix](be) exchange sink buffer may use wrong runtime 
state (#50588)
eee5e6290d1 is described below

commit eee5e6290d14e7e998c0b8eced9861956d59f0a7
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Tue May 6 14:47:18 2025 +0800

    [bugfix](be) exchange sink buffer may use wrong runtime state (#50588)
    
    ### What problem does this PR solve?
    
    1. exchange sink buffer maybe shared among different pipeline. In this
    case, it should use pipeline fragment ctx's runtime state.
    2. In other case, it should use local runtime state.
---
 be/src/pipeline/exec/exchange_sink_buffer.h               |  1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp           | 14 +++++++-------
 be/src/pipeline/exec/exchange_sink_operator.h             | 11 +++++++----
 be/src/pipeline/pipeline_fragment_context.h               |  2 --
 be/test/pipeline/operator/exchange_sink_operator_test.cpp |  2 +-
 5 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 44416ef68e1..5c650cc5132 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -310,6 +310,7 @@ private:
 
     PlanNodeId _node_id;
     std::atomic<int64_t> _rpc_count = 0;
+    // The state may be from PipelineFragmentContext if it is shared among 
multi instances.
     RuntimeState* _state = nullptr;
     QueryContext* _context = nullptr;
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e944cb78f67..ebb8aad7ad5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -106,7 +106,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _rpc_channels_num = channels.size() - local_size;
 
     if (!_only_local_exchange) {
-        _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
+        _sink_buffer = p.get_sink_buffer(state, 
state->fragment_instance_id().lo);
         register_channels(_sink_buffer.get());
         _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"ExchangeSinkQueueDependency", true);
@@ -349,7 +349,7 @@ void ExchangeSinkOperatorX::_init_sink_buffer() {
     for (auto fragment_instance_id : _fragment_instance_ids) {
         ins_ids.push_back(fragment_instance_id.lo);
     }
-    _sink_buffer = _create_buffer(ins_ids);
+    _sink_buffer = _create_buffer(_state, ins_ids);
 }
 
 template <typename ChannelPtrType>
@@ -572,11 +572,11 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
 }
 
 std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer(
-        const std::vector<InstanceLoId>& sender_ins_ids) {
+        RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids) {
     PUniqueId id;
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
-    auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
_node_id, state(),
+    auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
_node_id, state,
                                                             sender_ins_ids);
     for (const auto& _dest : _dests) {
         sink_buffer->construct_request(_dest.fragment_instance_id);
@@ -590,17 +590,17 @@ std::shared_ptr<ExchangeSinkBuffer> 
ExchangeSinkOperatorX::_create_buffer(
 // (Note: This does not reduce the total number of RPCs.)
 // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is 
not needed.
 std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
-        InstanceLoId sender_ins_id) {
+        RuntimeState* state, InstanceLoId sender_ins_id) {
     // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
     // it is an order-by scenario.
     // In this case, there is only one target instance, and no n * n RPC 
concurrency will occur.
     // Therefore, sharing a sink buffer is not necessary.
     if (_dest_is_merge) {
-        return _create_buffer({sender_ins_id});
+        return _create_buffer(state, {sender_ins_id});
     }
     if (_state->enable_shared_exchange_sink_buffer()) {
         return _sink_buffer;
     }
-    return _create_buffer({sender_ins_id});
+    return _create_buffer(state, {sender_ins_id});
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index cdef5e5e119..b6a22e1a8aa 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -189,8 +189,10 @@ public:
                           const std::vector<TUniqueId>& fragment_instance_ids);
     Status init(const TDataSink& tsink) override;
 
-    RuntimeState* state() { return _state; }
-
+    // The state is from pipeline fragment context, it will be saved in 
ExchangeSinkOperator
+    // and it will be passed to exchange sink buffer. So that exchange sink 
buffer should not
+    // be used after pipeline fragment ctx. All operations in Exchange Sink 
Buffer should hold
+    // TaskExecutionContext.
     Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
@@ -217,7 +219,8 @@ public:
     // Therefore, a shared sink buffer is used here to limit the number of 
concurrent RPCs.
     // (Note: This does not reduce the total number of RPCs.)
     // In a merge sort scenario, there are only n RPCs, so a shared sink 
buffer is not needed.
-    std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId 
sender_ins_id);
+    std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(RuntimeState* state,
+                                                        InstanceLoId 
sender_ins_id);
     vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return 
_tablet_sink_expr_ctxs; }
 
 private:
@@ -232,7 +235,7 @@ private:
     // The sink buffer can be shared among multiple ExchangeSinkLocalState 
instances,
     // or each ExchangeSinkLocalState can have its own sink buffer.
     std::shared_ptr<ExchangeSinkBuffer> _create_buffer(
-            const std::vector<InstanceLoId>& sender_ins_ids);
+            RuntimeState* state, const std::vector<InstanceLoId>& 
sender_ins_ids);
     std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
     RuntimeState* _state = nullptr;
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 8750ce470d9..8bcbffa6ff9 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -82,8 +82,6 @@ public:
 
     PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
 
-    RuntimeState* get_runtime_state() { return _runtime_state.get(); }
-
     QueryContext* get_query_ctx() { return _query_ctx.get(); }
     [[nodiscard]] bool is_canceled() const { return 
_query_ctx->is_cancelled(); }
 
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp 
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index aa3fc6f7877..27fa6523e98 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -61,7 +61,7 @@ struct MockExchangeSinkOperatorX : public 
ExchangeSinkOperatorX {
 
     void _init_sink_buffer() override {
         std::vector<InstanceLoId> ins_ids {fragment_instance_id.lo};
-        _sink_buffer = _create_buffer(ins_ids);
+        _sink_buffer = _create_buffer(_state, ins_ids);
     }
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to