This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f0629da10a6 [pipelineX](fix) Fix illegal memory access (#29283)
f0629da10a6 is described below

commit f0629da10a674c04f819a60338d279db704ff5dd
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Dec 29 18:35:41 2023 +0800

    [pipelineX](fix) Fix illegal memory access (#29283)
---
 be/src/pipeline/exec/aggregation_source_operator.cpp |  2 +-
 be/src/pipeline/exec/data_queue.h                    | 10 ++++------
 be/src/pipeline/exec/union_source_operator.cpp       |  2 +-
 3 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index e80f1e3d5e8..dc476cf63a0 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -50,7 +50,7 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
     auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._is_streaming) {
-        _shared_state->data_queue->set_source_dependency(_dependency);
+        _shared_state->data_queue->set_source_dependency(info.dependency);
     }
     if (p._without_key) {
         if (p._needs_finalize) {
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index f0d3511a86f..4eaa768fb8c 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -28,8 +28,7 @@
 #include "util/spinlock.h"
 #include "vec/core/block.h"
 
-namespace doris {
-namespace pipeline {
+namespace doris::pipeline {
 
 class Dependency;
 
@@ -61,7 +60,7 @@ public:
     int64_t max_size_of_queue() const { return _max_size_of_queue; }
 
     bool data_exhausted() const { return _data_exhausted; }
-    void set_source_dependency(Dependency* source_dependency) {
+    void set_source_dependency(std::shared_ptr<Dependency> source_dependency) {
         _source_dependency = source_dependency;
     }
     void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
@@ -105,10 +104,9 @@ private:
     static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
 
     // data queue is multi sink one source
-    Dependency* _source_dependency = nullptr;
+    std::shared_ptr<Dependency> _source_dependency = nullptr;
     std::vector<Dependency*> _sink_dependencies;
     SpinLock _source_lock;
 };
 
-} // namespace pipeline
-} // namespace doris
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index c9deae0d037..b42f941d608 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
     }
     RETURN_IF_ERROR(Base::init(state, info));
-    ss->data_queue.set_source_dependency(_dependency);
+    ss->data_queue.set_source_dependency(info.dependency);
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     // Const exprs materialized by this node. These exprs don't refer to any 
children.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to