This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ba882dea21 [pipelineX](dependency) Build DAG between pipelines (#23355) ba882dea21 is described below commit ba882dea2170c0d01ebdcb5ea7932c395cb0880e Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Aug 23 13:21:32 2023 +0800 [pipelineX](dependency) Build DAG between pipelines (#23355) --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 ---- be/src/pipeline/exec/aggregation_sink_operator.h | 1 - .../pipeline/exec/aggregation_source_operator.cpp | 14 ++++- be/src/pipeline/exec/aggregation_source_operator.h | 1 + be/src/pipeline/pipeline.h | 2 + .../pipeline_x/pipeline_x_fragment_context.cpp | 63 +++++++++++++++++++--- .../pipeline_x/pipeline_x_fragment_context.h | 3 ++ be/src/vec/common/sort/vsort_exec_exprs.cpp | 1 + 8 files changed, 75 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 6db0162325..854a4ed122 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -56,7 +56,6 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* st _merge_timer(nullptr), _serialize_data_timer(nullptr), _deserialize_data_timer(nullptr), - _hash_table_size_counter(nullptr), _max_row_size_counter(nullptr) {} Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { @@ -96,7 +95,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime"); _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime"); - _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); _hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(profile(), "MaxRowSizeInBytes", TUnit::UNIT); COUNTER_SET(_max_row_size_counter, (int64_t)0); @@ -863,15 +861,6 @@ Status AggSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateIn Status AggSinkOperatorX::close(RuntimeState* state) { auto& local_state = state->get_sink_local_state(id())->cast<AggSinkLocalState>(); - /// _hash_table_size_counter may be null if prepare failed. - if (local_state._hash_table_size_counter) { - std::visit( - [&](auto&& agg_method) { - COUNTER_SET(local_state._hash_table_size_counter, - int64_t(agg_method.data.size())); - }, - local_state._agg_data->method_variant); - } local_state._preagg_block.clear(); vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 72136cd3f3..34267eb270 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -279,7 +279,6 @@ protected: RuntimeProfile::Counter* _merge_timer; RuntimeProfile::Counter* _serialize_data_timer; RuntimeProfile::Counter* _deserialize_data_timer; - RuntimeProfile::Counter* _hash_table_size_counter; RuntimeProfile::Counter* _max_row_size_counter; RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::Counter* _hash_table_memory_usage; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 5dee84b216..1c9d9864d7 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -33,7 +33,8 @@ AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) _serialize_result_timer(nullptr), _hash_table_iterate_timer(nullptr), _insert_keys_to_column_timer(nullptr), - _serialize_data_timer(nullptr) {} + _serialize_data_timer(nullptr), + _hash_table_size_counter(nullptr) {} Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); @@ -45,6 +46,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime"); _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime"); + _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); auto& p = _parent->cast<AggSourceOperatorX>(); if (p._without_key) { if (p._needs_finalize) { @@ -527,6 +529,16 @@ Status AggSourceOperatorX::close(RuntimeState* state) { local_state._executor.close(); } + /// _hash_table_size_counter may be null if prepare failed. + if (local_state._hash_table_size_counter) { + std::visit( + [&](auto&& agg_method) { + COUNTER_SET(local_state._hash_table_size_counter, + int64_t(agg_method.data.size())); + }, + local_state._agg_data->method_variant); + } + local_state._shared_state->agg_data = nullptr; local_state._shared_state->aggregate_data_container = nullptr; local_state._shared_state->agg_arena_pool = nullptr; diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 6831a110e6..df99f75023 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -88,6 +88,7 @@ private: RuntimeProfile::Counter* _hash_table_iterate_timer; RuntimeProfile::Counter* _insert_keys_to_column_timer; RuntimeProfile::Counter* _serialize_data_timer; + RuntimeProfile::Counter* _hash_table_size_counter; using vectorized_get_result = std::function<Status( RuntimeState* state, vectorized::Block* block, SourceState& source_state)>; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 38d8f1df00..114f51071d 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -94,6 +94,8 @@ public: return _operators[_operators.size() - 1]->row_desc(); } + PipelineId id() const { return _pipeline_id; } + private: void _init_profile(); std::atomic<uint32_t> _complete_dependency; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 0cb7d24405..13e89e8766 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -295,21 +295,51 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl); - for (int pip_id = _pipelines.size() - 1; pip_id >= 0; pip_id--) { + std::map<PipelineId, PipelineXTask*> pipeline_id_to_task; + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, - _pipelines[pip_id]->operator_xs().front()->id(), + _pipelines[pip_idx]->operator_xs().front()->id(), no_scan_ranges); auto task = std::make_unique<PipelineXTask>( - _pipelines[pip_id], _total_tasks++, _runtime_states[i].get(), this, - _pipelines[pip_id]->pipeline_profile(), scan_ranges, local_params.sender_id); + _pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, + _pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id); + pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()}); RETURN_IF_ERROR(task->prepare(_runtime_states[i].get())); - _runtime_profile->add_child(_pipelines[pip_id]->pipeline_profile(), true, nullptr); - if (pip_id < _pipelines.size() - 1) { - task->set_upstream_dependency(_tasks[i].back()->get_downstream_dependency()); - } + _runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, nullptr); _tasks[i].emplace_back(std::move(task)); } + + /** + * Build DAG for pipeline tasks. + * For example, we have + * + * ExchangeSink (Pipeline1) JoinBuildSink (Pipeline2) + * \ / + * JoinProbeOperator1 (Pipeline1) JoinBuildSink (Pipeline3) + * \ / + * JoinProbeOperator2 (Pipeline1) + * + * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3. + * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and + * `pipeline_id_to_task` is used to find the task by a unique pipeline ID. + * + * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1 + * and JoinProbeOperator2. + */ + + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { + auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; + DCHECK(task != nullptr); + + if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) { + auto& deps = _dag[_pipelines[pip_idx]->id()]; + for (auto& dep : deps) { + task->set_upstream_dependency( + pipeline_id_to_task[dep]->get_downstream_dependency()); + } + } + } } // register the profile of child data stream sender // for (auto& sender : _multi_cast_stream_sink_senders) { @@ -475,7 +505,12 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN "StreamingAggSourceXOperator")); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); @@ -484,7 +519,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new AggSourceOperatorX(pool, tnode, descs, "AggSourceXOperator")); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + DataSinkOperatorXPtr sink; sink.reset(new AggSinkOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); @@ -496,7 +537,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new SortSourceOperatorX(pool, tnode, descs, "SortSourceXOperator")); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + DataSinkOperatorXPtr sink; sink.reset(new SortSinkOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index fd27464491..075b2bc931 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -125,6 +125,9 @@ private: DataSinkOperatorXPtr _sink; std::atomic_bool _canceled = false; + + // `_dag` manage dependencies between pipelines by pipeline ID + std::map<PipelineId, std::vector<PipelineId>> _dag; }; } // namespace pipeline } // namespace doris diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp b/be/src/vec/common/sort/vsort_exec_exprs.cpp index 9b9b91426d..cb3aaa6d65 100644 --- a/be/src/vec/common/sort/vsort_exec_exprs.cpp +++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp @@ -98,6 +98,7 @@ Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) { RETURN_IF_ERROR( _rhs_ordering_expr_ctxs[i]->clone(state, new_exprs._rhs_ordering_expr_ctxs[i])); } + new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size()); for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) { RETURN_IF_ERROR(_sort_tuple_slot_expr_ctxs[i]->clone( state, new_exprs._sort_tuple_slot_expr_ctxs[i])); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org