This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f0629da10a6 [pipelineX](fix) Fix illegal memory access (#29283) f0629da10a6 is described below commit f0629da10a674c04f819a60338d279db704ff5dd Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Dec 29 18:35:41 2023 +0800 [pipelineX](fix) Fix illegal memory access (#29283) --- be/src/pipeline/exec/aggregation_source_operator.cpp | 2 +- be/src/pipeline/exec/data_queue.h | 10 ++++------ be/src/pipeline/exec/union_source_operator.cpp | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index e80f1e3d5e8..dc476cf63a0 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -50,7 +50,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); auto& p = _parent->template cast<AggSourceOperatorX>(); if (p._is_streaming) { - _shared_state->data_queue->set_source_dependency(_dependency); + _shared_state->data_queue->set_source_dependency(info.dependency); } if (p._without_key) { if (p._needs_finalize) { diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index f0d3511a86f..4eaa768fb8c 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -28,8 +28,7 @@ #include "util/spinlock.h" #include "vec/core/block.h" -namespace doris { -namespace pipeline { +namespace doris::pipeline { class Dependency; @@ -61,7 +60,7 @@ public: int64_t max_size_of_queue() const { return _max_size_of_queue; } bool data_exhausted() const { return _data_exhausted; } - void set_source_dependency(Dependency* source_dependency) { + void set_source_dependency(std::shared_ptr<Dependency> source_dependency) { _source_dependency = source_dependency; } void set_sink_dependency(Dependency* sink_dependency, int child_idx) { @@ -105,10 +104,9 @@ private: static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; // data queue is multi sink one source - Dependency* _source_dependency = nullptr; + std::shared_ptr<Dependency> _source_dependency = nullptr; std::vector<Dependency*> _sink_dependencies; SpinLock _source_lock; }; -} // namespace pipeline -} // namespace doris +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index c9deae0d037..b42f941d608 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss); } RETURN_IF_ERROR(Base::init(state, info)); - ss->data_queue.set_source_dependency(_dependency); + ss->data_queue.set_source_dependency(info.dependency); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); // Const exprs materialized by this node. These exprs don't refer to any children. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org