This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 276625d17d4 [pipelineX](refactor) refine relationship between shared state and dependency (#30294) 276625d17d4 is described below commit 276625d17d4b5e189250d15b8b8865b0a19819b8 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jan 24 17:00:48 2024 +0800 [pipelineX](refactor) refine relationship between shared state and dependency (#30294) --- be/src/pipeline/exec/aggregation_sink_operator.h | 14 ++-- be/src/pipeline/exec/aggregation_source_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 4 - be/src/pipeline/exec/analytic_source_operator.h | 4 +- .../pipeline/exec/multi_cast_data_stream_sink.cpp | 11 --- be/src/pipeline/exec/multi_cast_data_stream_sink.h | 3 +- .../exec/multi_cast_data_stream_source.cpp | 7 -- .../pipeline/exec/multi_cast_data_stream_source.h | 1 - be/src/pipeline/exec/multi_cast_data_streamer.cpp | 32 ++----- be/src/pipeline/exec/multi_cast_data_streamer.h | 13 +-- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.cpp | 9 +- be/src/pipeline/exec/union_source_operator.cpp | 28 ++----- be/src/pipeline/exec/union_source_operator.h | 1 - be/src/pipeline/pipeline_x/dependency.cpp | 6 +- be/src/pipeline/pipeline_x/dependency.h | 97 +++++----------------- .../local_exchange_source_operator.cpp | 14 +--- .../local_exchange_source_operator.h | 1 - be/src/pipeline/pipeline_x/operator.cpp | 76 +++++++++++------ be/src/pipeline/pipeline_x/operator.h | 23 +++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 12 ++- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 20 +++-- be/src/pipeline/pipeline_x/pipeline_x_task.h | 26 +++++- 24 files changed, 164 insertions(+), 244 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index bf41fe30536..a7b30d46117 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -53,9 +53,8 @@ public: ~AggSinkDependency() override = default; void set_ready() override { - std::shared_ptr<BasicSharedState> shared_state = _shared_state; - if (shared_state && _is_streaming_agg_state(shared_state)) { - if (((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) { + if (_shared_state && _is_streaming_agg_state(_shared_state)) { + if (((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) { Dependency::set_ready(); } } else { @@ -64,9 +63,8 @@ public: } void block() override { - std::shared_ptr<BasicSharedState> shared_state = _shared_state; - if (_is_streaming_agg_state(shared_state)) { - if (!((SharedState*)shared_state.get())->data_queue->has_enough_space_to_push()) { + if (_is_streaming_agg_state(_shared_state)) { + if (!((SharedState*)_shared_state)->data_queue->has_enough_space_to_push()) { Dependency::block(); } } else { @@ -75,8 +73,8 @@ public: } private: - static bool _is_streaming_agg_state(const std::shared_ptr<BasicSharedState>& shared_state) { - return ((SharedState*)shared_state.get())->data_queue != nullptr; + static bool _is_streaming_agg_state(const BasicSharedState* shared_state) { + return ((SharedState*)shared_state)->data_queue != nullptr; } }; diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index a7125c42ff6..c03aefcc04b 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -63,7 +63,7 @@ public: private: bool _is_streaming_agg_state() { - return ((SharedState*)Dependency::_shared_state.get())->data_queue != nullptr; + return ((SharedState*)Dependency::_shared_state)->data_queue != nullptr; } }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 0214b22c006..064d68f189a 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -63,10 +63,6 @@ public: : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status close(RuntimeState* state, Status exec_status) override { - _shared_state->release_sink_dep(); - return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, exec_status); - } private: friend class AnalyticSinkOperatorX; diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index dc86bc95062..f4e2f10a719 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -84,9 +84,7 @@ private: if (need_more_input) { _dependency->block(); _dependency->set_ready_to_write(); - if (!_shared_state->sink_released_flag) { - _shared_state->sink_dep->set_ready(); - } + _shared_state->sink_dep->set_ready(); } else { _dependency->set_block_to_write(); _dependency->set_ready(); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index 8a45634027f..6b5506be269 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -35,15 +35,4 @@ std::string MultiCastDataStreamSinkLocalState::name_suffix() { return id_name; } -Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - auto multi_cast_data_streamer = static_cast<MultiCastDataStreamSinkOperatorX*>(_parent) - ->create_multi_cast_data_streamer(); - auto& deps = info.dependencys; - for (auto dep : deps) { - ((MultiCastSinkDependency*)dep.get())->set_shared_state(multi_cast_data_streamer); - } - RETURN_IF_ERROR(Base::init(state, info)); - return Status::OK(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index 965605fd3c7..84a75720a66 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -59,7 +59,6 @@ class MultiCastDataStreamSinkLocalState final friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; using Base = PipelineXSinkLocalState<MultiCastSinkDependency>; using Parent = MultiCastDataStreamSinkOperatorX; - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; std::string name_suffix() override; private: @@ -113,7 +112,7 @@ private: friend class MultiCastDataStreamSinkLocalState; ObjectPool* _pool; RowDescriptor _row_desc; - int _cast_sender_count; + const int _cast_sender_count; const TMultiCastDataStreamSink& _sink; friend class MultiCastDataStreamSinkLocalState; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index d360e2eb5dd..6ac06ee5f10 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -152,13 +152,6 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState return Status::OK(); } -Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { - _shared_state->multi_cast_data_streamer.released_dependency( - _parent->cast<Parent>()._consumer_id); - RETURN_IF_ERROR(Base::close(state)); - return Status::OK(); -} - Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 73c506f9cb8..5dae2be31f7 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -117,7 +117,6 @@ public: RETURN_IF_ERROR(_acquire_runtime_filter()); return Status::OK(); } - Status close(RuntimeState* state) override; friend class MultiCastDataStreamerSourceOperatorX; RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); } diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index f3e44731aef..175a21469b8 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -104,37 +104,15 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) { if (_dependencies.empty()) { return; } - if (_dependencies_release_flag[sender_idx]) { - return; - } - { - std::unique_lock<std::mutex> lc(_release_lock); - if (_dependencies_release_flag[sender_idx]) { - return; - } - auto* dep = _dependencies[sender_idx]; - DCHECK(dep); - dep->set_ready(); - } + auto* dep = _dependencies[sender_idx]; + DCHECK(dep); + dep->set_ready(); } void MultiCastDataStreamer::_set_ready_for_read() { - size_t i = 0; for (auto* dep : _dependencies) { - if (_dependencies_release_flag[i]) { - i++; - continue; - } - { - std::unique_lock<std::mutex> lc(_release_lock); - if (_dependencies_release_flag[i]) { - i++; - continue; - } - DCHECK(dep); - dep->set_ready(); - i++; - } + DCHECK(dep); + dep->set_ready(); } } diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 7f221d622c0..5e4179e0cad 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -38,14 +38,10 @@ public: bool with_dependencies = false) : _row_desc(row_desc), _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), - _cast_sender_count(cast_sender_count), - _dependencies_release_flag(cast_sender_count) { + _cast_sender_count(cast_sender_count) { _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); if (with_dependencies) { _dependencies.resize(cast_sender_count, nullptr); - for (size_t i = 0; i < cast_sender_count; i++) { - _dependencies_release_flag[i] = false; - } } _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); @@ -83,11 +79,6 @@ public: _block_reading(sender_idx); } - void released_dependency(int sender_idx) { - std::unique_lock<std::mutex> lc(_release_lock); - _dependencies_release_flag[sender_idx] = true; - } - private: void _set_ready_for_read(int sender_idx); void _set_ready_for_read(); @@ -106,8 +97,6 @@ private: RuntimeProfile::Counter* _process_rows = nullptr; RuntimeProfile::Counter* _peak_mem_usage = nullptr; - std::mutex _release_lock; - std::vector<std::atomic<bool>> _dependencies_release_flag; std::vector<MultiCastSourceDependency*> _dependencies; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 6ac09ccfd9d..b8687376059 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -76,7 +76,7 @@ public: void set_cur_child_id(int id) { _child_idx = id; - ((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id] = this; + ((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this; block(); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index b851e18ce5b..946720cd179 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -68,7 +68,7 @@ public: ~SetSinkDependency() override = default; void set_cur_child_id(int id) { - ((SetSharedState*)_shared_state.get())->probe_finished_children_dependency[id] = this; + ((SetSharedState*)_shared_state)->probe_finished_children_dependency[id] = this; } }; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 03cd67477db..1e64bf8a50d 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -51,13 +51,14 @@ template class SetSourceOperator<false>; template <bool is_intersect> Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateInfo& info) { - std::shared_ptr<typename SetSourceDependency::SharedState> ss = nullptr; + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); auto& deps = info.upstream_dependencies; - ss.reset(new typename SetSourceDependency::SharedState(deps.size())); + _shared_state->probe_finished_children_dependency.resize(deps.size(), nullptr); for (auto& dep : deps) { - ((SetSourceDependency*)dep.get())->set_shared_state(ss); + dep->set_shared_state(_dependency->shared_state()); } - RETURN_IF_ERROR(Base::init(state, info)); return Status::OK(); } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index e8ef1ba7207..709e89368a8 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -105,28 +105,19 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl } Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); auto& p = _parent->cast<Parent>(); int child_count = p.get_child_count(); - auto ss = create_shared_state(); if (child_count != 0) { auto& deps = info.upstream_dependencies; for (auto& dep : deps) { - ((UnionSinkDependency*)dep.get())->set_shared_state(ss); + dep->set_shared_state(_dependency->shared_state()); } - } else { - auto& deps = info.upstream_dependencies; - DCHECK(child_count == 0); - DCHECK(deps.size() == 1); - DCHECK(deps.front() == nullptr); - //child_count == 0 , we need to creat a UnionDependency - deps.front() = std::make_shared<UnionSourceDependency>( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); - ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss); } - RETURN_IF_ERROR(Base::init(state, info)); - ss->data_queue.set_source_dependency(info.dependency); - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + ((UnionSharedState*)_dependency->shared_state()) + ->data_queue.set_source_dependency(info.dependency); // Const exprs materialized by this node. These exprs don't refer to any children. // Only materialized by the first fragment instance to avoid duplication. if (state->per_fragment_instance_idx() == 0) { @@ -151,13 +142,6 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -std::shared_ptr<UnionSharedState> UnionSourceLocalState::create_shared_state() { - auto& p = _parent->cast<Parent>(); - std::shared_ptr<UnionSharedState> data_queue = - std::make_shared<UnionSharedState>(p._child_size); - return data_queue; -} - std::string UnionSourceLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 0c150a072b6..887c0cb9639 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -86,7 +86,6 @@ public: UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; Status init(RuntimeState* state, LocalStateInfo& info) override; - std::shared_ptr<UnionSharedState> create_shared_state(); [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index bb27a688820..9cb6e99b44f 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -189,13 +189,9 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } -LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) - : dependencies_release_flag(num_instances) { +LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { source_dependencies.resize(num_instances, nullptr); mem_trackers.resize(num_instances, nullptr); - for (size_t i = 0; i < num_instances; i++) { - dependencies_release_flag[i] = false; - } } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 172f7383f3e..59f9fee3775 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -57,22 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { - Dependency* source_dep = nullptr; - Dependency* sink_dep = nullptr; - - std::atomic_bool source_released_flag {false}; - std::atomic_bool sink_released_flag {false}; - std::mutex source_release_lock; - std::mutex sink_release_lock; - - void release_source_dep() { - std::unique_lock<std::mutex> lc(source_release_lock); - source_released_flag = true; + template <class TARGET> + TARGET* cast() { + DCHECK(dynamic_cast<TARGET*>(this)) + << " Mismatch type! Current type is " << typeid(*this).name() + << " and expect type is" << typeid(TARGET).name(); + return reinterpret_cast<TARGET*>(this); } - void release_sink_dep() { - std::unique_lock<std::mutex> lc(sink_release_lock); - sink_released_flag = true; + template <class TARGET> + const TARGET* cast() const { + DCHECK(dynamic_cast<const TARGET*>(this)) + << " Mismatch type! Current type is " << typeid(*this).name() + << " and expect type is" << typeid(TARGET).name(); + return reinterpret_cast<const TARGET*>(this); } + DependencySPtr source_dep = nullptr; + DependencySPtr sink_dep = nullptr; virtual ~BasicSharedState() = default; }; @@ -99,11 +99,8 @@ public: [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } void add_child(std::shared_ptr<Dependency> child) { _children.push_back(child); } - std::shared_ptr<BasicSharedState> shared_state() { return _shared_state; } - void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) { - _shared_state = shared_state; - } - void clear_shared_state() { _shared_state.reset(); } + BasicSharedState* shared_state() { return _shared_state; } + void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; } virtual std::string debug_string(int indentation_level = 0); // Start the watcher. We use it to count how long this dependency block the current pipeline task. @@ -121,47 +118,19 @@ public: virtual void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); - if (_shared_state->source_released_flag) { - return; - } - std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); - if (_shared_state->source_released_flag) { - return; - } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } void set_block_to_read() { DCHECK(_is_write_dependency) << debug_string(); - if (_shared_state->source_released_flag) { - return; - } - std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); - if (_shared_state->source_released_flag) { - return; - } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->block(); } void set_ready_to_write() { - if (_shared_state->sink_released_flag) { - return; - } - std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock); - if (_shared_state->sink_released_flag) { - return; - } DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); _shared_state->sink_dep->set_ready(); } void set_block_to_write() { - if (_shared_state->sink_released_flag) { - return; - } - std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock); - if (_shared_state->sink_released_flag) { - return; - } DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); _shared_state->sink_dep->block(); } @@ -180,7 +149,7 @@ protected: std::atomic<bool> _ready; const QueryContext* _query_ctx = nullptr; - std::shared_ptr<BasicSharedState> _shared_state = nullptr; + BasicSharedState* _shared_state = nullptr; MonotonicStopWatch _watcher; std::list<std::shared_ptr<Dependency>> _children; @@ -524,7 +493,6 @@ public: struct SetSharedState : public BasicSharedState { public: - SetSharedState(int num_deps) { probe_finished_children_dependency.resize(num_deps, nullptr); } /// default init vectorized::Block build_block; // build to source //record element size in hashtable @@ -666,45 +634,24 @@ public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); LocalExchangeSharedState(int num_instances); std::unique_ptr<Exchanger> exchanger {}; - std::vector<Dependency*> source_dependencies; - std::vector<std::atomic_bool> dependencies_release_flag; - Dependency* sink_dependency; + std::vector<DependencySPtr> source_dependencies; + DependencySPtr sink_dependency; std::vector<MemTracker*> mem_trackers; std::atomic<size_t> mem_usage = 0; std::mutex le_lock; void sub_running_sink_operators(); void _set_ready_for_read() { - size_t i = 0; for (auto& dep : source_dependencies) { - if (dependencies_release_flag[i]) { - i++; - continue; - } - { - std::unique_lock<std::mutex> lc(source_release_lock); - if (dependencies_release_flag[i]) { - i++; - continue; - } - DCHECK(dep); - dep->set_ready(); - i++; - } + DCHECK(dep); + dep->set_ready(); } } - void set_dep_by_channel_id(Dependency* dep, int channel_id) { + void set_dep_by_channel_id(DependencySPtr dep, int channel_id) { source_dependencies[channel_id] = dep; } void set_ready_to_read(int channel_id) { - if (dependencies_release_flag[channel_id]) { - return; - } - std::unique_lock<std::mutex> lc(source_release_lock); - if (dependencies_release_flag[channel_id]) { - return; - } auto& dep = source_dependencies[channel_id]; DCHECK(dep) << channel_id; dep->set_ready(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 9e98e3b6e8f..edfb8114811 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -22,11 +22,11 @@ namespace doris::pipeline { void LocalExchangeSourceDependency::block() { - if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators == 0) { + if (((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators == 0) { return; } - std::unique_lock<std::mutex> lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock); - if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators == 0) { + std::unique_lock<std::mutex> lc(((LocalExchangeSharedState*)_shared_state)->le_lock); + if (((LocalExchangeSharedState*)_shared_state)->exchanger->_running_sink_operators == 0) { return; } Dependency::block(); @@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _channel_id = info.task_idx; - _shared_state->set_dep_by_channel_id(_dependency, _channel_id); + _shared_state->set_dep_by_channel_id(info.dependency, _channel_id); _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); @@ -61,12 +61,6 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c return fmt::to_string(debug_string_buffer); } -Status LocalExchangeSourceLocalState::close(RuntimeState* state) { - _shared_state->dependencies_release_flag[_channel_id] = true; - RETURN_IF_ERROR(Base::close(state)); - return Status::OK(); -} - Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 4c95a84b533..63d71bbe08b 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -48,7 +48,6 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; std::string debug_string(int indentation_level) const override; - Status close(RuntimeState* state) override; private: friend class LocalExchangeSourceOperatorX; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 21453dfbc3c..db687865657 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -280,18 +280,32 @@ template <> inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false; template <typename LocalStateType> -void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& dependency, - QueryContext* ctx) { - std::shared_ptr<typename LocalStateType::DependencyType::SharedState> ss = nullptr; +void DataSinkOperatorX<LocalStateType>::get_dependency( + vector<DependencySPtr>& dependency, + std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, QueryContext* ctx) { + std::shared_ptr<BasicSharedState> ss = nullptr; if constexpr (NeedToCreate<typename LocalStateType::DependencyType::SharedState>) { ss.reset(new typename LocalStateType::DependencyType::SharedState()); + DCHECK(!shared_states.contains(dests_id().front())); + if constexpr (!std::is_same_v<typename LocalStateType::DependencyType::SharedState, + FakeSharedState>) { + shared_states.insert({dests_id().front(), ss}); + } + } else if constexpr (std::is_same_v<typename LocalStateType::DependencyType::SharedState, + MultiCastSharedState>) { + ss = ((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer(); + auto& dests = dests_id(); + for (auto& dest_id : dests) { + DCHECK(!shared_states.contains(dest_id)); + shared_states.insert({dest_id, ss}); + } } if constexpr (!std::is_same_v<typename LocalStateType::DependencyType, FakeDependency>) { auto& dests = dests_id(); for (auto& dest_id : dests) { dependency.push_back(std::make_shared<typename LocalStateType::DependencyType>( dest_id, _node_id, ctx)); - dependency.back()->set_shared_state(ss); + dependency.back()->set_shared_state(ss.get()); } } else { dependency.push_back(nullptr); @@ -299,8 +313,23 @@ void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& d } template <typename LocalStateType> -DependencySPtr OperatorX<LocalStateType>::get_dependency(QueryContext* ctx) { - return std::make_shared<typename LocalStateType::DependencyType>(_operator_id, _node_id, ctx); +DependencySPtr OperatorX<LocalStateType>::get_dependency( + QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) { + std::shared_ptr<BasicSharedState> ss = nullptr; + if constexpr (std::is_same_v<typename LocalStateType::DependencyType::SharedState, + SetSharedState>) { + ss.reset(new typename LocalStateType::DependencyType::SharedState()); + shared_states.insert({operator_id(), ss}); + } else if constexpr (std::is_same_v<typename LocalStateType::DependencyType::SharedState, + UnionSharedState>) { + ss.reset(new typename LocalStateType::DependencyType::SharedState( + ((UnionSourceOperatorX*)this)->get_child_count())); + shared_states.insert({operator_id(), ss}); + } + auto dep = + std::make_shared<typename LocalStateType::DependencyType>(_operator_id, _node_id, ctx); + dep->set_shared_state(ss.get()); + return dep; } template <typename LocalStateType> @@ -338,19 +367,20 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); auto& deps = info.upstream_dependencies; if constexpr (std::is_same_v<LocalExchangeSourceDependency, DependencyType>) { - _dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first); - _shared_state = - (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get()); + _shared_state = _dependency->shared_state() + ->template cast<typename DependencyType::SharedState>(); - _shared_state->source_dep = info.dependency.get(); - _shared_state->sink_dep = deps.front().get(); + _shared_state->source_dep = info.dependency; } else if constexpr (!is_fake_shared) { - _dependency->set_shared_state(deps.front()->shared_state()); - _shared_state = - (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _dependency->set_shared_state(info.shared_state); + _shared_state = _dependency->shared_state() + ->template cast<typename DependencyType::SharedState>(); - _shared_state->source_dep = info.dependency.get(); - _shared_state->sink_dep = deps.front().get(); + _shared_state->source_dep = info.dependency; + if (!deps.empty()) { + _shared_state->sink_dep = deps.front(); + } } } @@ -382,12 +412,8 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - if (_shared_state) { - _shared_state->release_source_dep(); - } if constexpr (!std::is_same_v<DependencyType, FakeDependency>) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); - _dependency->clear_shared_state(); } if (_rows_returned_counter != nullptr) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -410,22 +436,21 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state, constexpr auto is_fake_shared = std::is_same_v<typename DependencyType::SharedState, FakeSharedState>; if constexpr (!std::is_same_v<FakeDependency, DependencyType>) { - auto& deps = info.dependencys; + auto& deps = info.dependencies; _dependency = (DependencyType*)deps.front().get(); if constexpr (std::is_same_v<LocalExchangeSinkDependency, DependencyType>) { _dependency = info.le_state_map[_parent->dests_id().front()].second.get(); } if (_dependency) { if constexpr (!is_fake_shared) { - _shared_state = - (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); } _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } } else { - auto& deps = info.dependencys; + auto& deps = info.dependencies; deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx()); _dependency = (DependencyType*)deps.front().get(); } @@ -448,9 +473,6 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu } if constexpr (!std::is_same_v<DependencyType, FakeDependency>) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); - if constexpr (!std::is_same_v<LocalExchangeSinkDependency, DependencyType>) { - _dependency->clear_shared_state(); - } } if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index e40c7849c09..1c076bd5a69 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -34,6 +34,7 @@ struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; const std::vector<TScanRangeParams> scan_ranges; std::vector<DependencySPtr>& upstream_dependencies; + BasicSharedState* shared_state; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<LocalExchangeSinkDependency>>> le_state_map; @@ -47,7 +48,7 @@ struct LocalSinkStateInfo { const int task_idx; RuntimeProfile* parent_profile = nullptr; const int sender_id; - std::vector<DependencySPtr>& dependencys; + std::vector<DependencySPtr>& dependencies; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<LocalExchangeSinkDependency>>> le_state_map; @@ -187,7 +188,8 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); } [[nodiscard]] std::string get_name() const override { return _op_name; } - [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; + [[nodiscard]] virtual DependencySPtr get_dependency( + QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) @@ -340,7 +342,9 @@ public: return state->get_local_state(operator_id())->template cast<LocalState>(); } - DependencySPtr get_dependency(QueryContext* ctx) override; + DependencySPtr get_dependency( + QueryContext* ctx, + std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) override; }; template <typename DependencyArg = FakeDependency> @@ -448,10 +452,7 @@ protected: class DataSinkOperatorXBase : public OperatorBase { public: DataSinkOperatorXBase(const int operator_id, const int node_id) - : OperatorBase(nullptr), - _operator_id(operator_id), - _node_id(node_id), - _dests_id({operator_id}) {} + : OperatorBase(nullptr), _operator_id(operator_id), _node_id(node_id), _dests_id({1}) {} DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id) : OperatorBase(nullptr), @@ -498,7 +499,9 @@ public: return reinterpret_cast<const TARGET&>(*this); } - virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0; + virtual void get_dependency(std::vector<DependencySPtr>& dependency, + std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, + QueryContext* ctx) = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) @@ -595,7 +598,9 @@ public: ~DataSinkOperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; - void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) override; + void get_dependency(std::vector<DependencySPtr>& dependency, + std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, + QueryContext* ctx) override; using LocalState = LocalStateType; [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index aa580b49d48..de0a544f18c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -618,10 +618,16 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( for (auto& dep : deps) { if (pipeline_id_to_task.contains(dep)) { task->add_upstream_dependency( - pipeline_id_to_task[dep]->get_downstream_dependency()); + pipeline_id_to_task[dep]->get_downstream_dependency(), + pipeline_id_to_task[dep]->get_shared_states()); } } } + } + } + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { + if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) { + auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx)); } } @@ -773,8 +779,8 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( } auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, local_exchange_id, _runtime_state->get_query_ctx()); - sink_dep->set_shared_state(shared_state); - shared_state->sink_dependency = sink_dep.get(); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_dependency = sink_dep; _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index fece30296c9..9a88c417be0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -60,9 +60,10 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); - _sink->get_dependency(_downstream_dependency, state->get_query_ctx()); + _sink->get_dependency(_downstream_dependency, _shared_states, state->get_query_ctx()); for (auto& op : _operators) { - _source_dependency.insert({op->operator_id(), op->get_dependency(state->get_query_ctx())}); + _source_dependency.insert( + {op->operator_id(), op->get_dependency(state->get_query_ctx(), _shared_states)}); } pipeline->incr_created_tasks(); } @@ -94,8 +95,13 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); - LocalStateInfo info {parent_profile, scan_ranges, deps, - _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; + LocalStateInfo info {parent_profile, + scan_ranges, + deps, + get_shared_state(op->operator_id()), + _le_state_map, + _task_idx, + _source_dependency[op->operator_id()]}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); query_ctx->register_query_statistics( @@ -297,9 +303,9 @@ void PipelineXTask::finalize() { std::unique_lock<std::mutex> lc(_release_lock); _finished = true; std::vector<DependencySPtr> {}.swap(_downstream_dependency); - DependencyMap {}.swap(_upstream_dependency); - std::map<int, DependencySPtr> {}.swap(_source_dependency); - + _upstream_dependency.clear(); + _source_dependency.clear(); + _shared_states.clear(); _le_state_map.clear(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 96069cbbea2..a558dbeb40e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -95,8 +95,10 @@ public: bool is_pending_finish() override { return _finish_blocked_dependency() != nullptr; } std::vector<DependencySPtr>& get_downstream_dependency() { return _downstream_dependency; } + std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { return _shared_states; } - void add_upstream_dependency(std::vector<DependencySPtr>& multi_upstream_dependency) { + void add_upstream_dependency(std::vector<DependencySPtr>& multi_upstream_dependency, + std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) { for (auto dep : multi_upstream_dependency) { int dst_id = dep->id(); if (!_upstream_dependency.contains(dst_id)) { @@ -104,16 +106,31 @@ public: } else { _upstream_dependency[dst_id].push_back(dep); } + + if (shared_states.contains(dst_id) && !_shared_states.contains(dst_id)) { + // Shared state is created by upstream task's sink operator and shared by source operator of this task. + _shared_states.insert({dst_id, shared_states[dst_id]}); + } else if (_shared_states.contains(dst_id) && !shared_states.contains(dst_id)) { + // Shared state is created by this task's source operator and shared by upstream task's sink operator. + shared_states.insert({dst_id, _shared_states[dst_id]}); + } } } std::vector<DependencySPtr>& get_upstream_dependency(int id) { if (_upstream_dependency.find(id) == _upstream_dependency.end()) { - _upstream_dependency.insert({id, {DependencySPtr {}}}); + _upstream_dependency.insert({id, {}}); } return _upstream_dependency[id]; } + BasicSharedState* get_shared_state(int id) { + if (!_shared_states.contains(id)) { + return nullptr; + } + return _shared_states[id].get(); + } + bool is_pipelineX() const override { return true; } void wake_up(); @@ -190,9 +207,14 @@ private: std::vector<Dependency*> _finish_dependencies; RuntimeFilterDependency* _filter_dependency; + // Write dependencies of upstream pipeline tasks. DependencyMap _upstream_dependency; + // Read dependencies of this pipeline task. std::map<int, DependencySPtr> _source_dependency; + // Write dependencies of this pipeline tasks. std::vector<DependencySPtr> _downstream_dependency; + // All shared states of this pipeline task. + std::map<int, std::shared_ptr<BasicSharedState>> _shared_states; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<LocalExchangeSinkDependency>>> _le_state_map; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org