This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba882dea21 [pipelineX](dependency) Build DAG between pipelines (#23355)
ba882dea21 is described below

commit ba882dea2170c0d01ebdcb5ea7932c395cb0880e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Aug 23 13:21:32 2023 +0800

    [pipelineX](dependency) Build DAG between pipelines (#23355)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 ----
 be/src/pipeline/exec/aggregation_sink_operator.h   |  1 -
 .../pipeline/exec/aggregation_source_operator.cpp  | 14 ++++-
 be/src/pipeline/exec/aggregation_source_operator.h |  1 +
 be/src/pipeline/pipeline.h                         |  2 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 63 +++++++++++++++++++---
 .../pipeline_x/pipeline_x_fragment_context.h       |  3 ++
 be/src/vec/common/sort/vsort_exec_exprs.cpp        |  1 +
 8 files changed, 75 insertions(+), 21 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 6db0162325..854a4ed122 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -56,7 +56,6 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* 
parent, RuntimeState* st
           _merge_timer(nullptr),
           _serialize_data_timer(nullptr),
           _deserialize_data_timer(nullptr),
-          _hash_table_size_counter(nullptr),
           _max_row_size_counter(nullptr) {}
 
 Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
@@ -96,7 +95,6 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime");
     _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime");
     _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime");
-    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
     _hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount", 
TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(profile(), "MaxRowSizeInBytes", 
TUnit::UNIT);
     COUNTER_SET(_max_row_size_counter, (int64_t)0);
@@ -863,15 +861,6 @@ Status AggSinkOperatorX::setup_local_state(RuntimeState* 
state, LocalSinkStateIn
 Status AggSinkOperatorX::close(RuntimeState* state) {
     auto& local_state = 
state->get_sink_local_state(id())->cast<AggSinkLocalState>();
 
-    /// _hash_table_size_counter may be null if prepare failed.
-    if (local_state._hash_table_size_counter) {
-        std::visit(
-                [&](auto&& agg_method) {
-                    COUNTER_SET(local_state._hash_table_size_counter,
-                                int64_t(agg_method.data.size()));
-                },
-                local_state._agg_data->method_variant);
-    }
     local_state._preagg_block.clear();
 
     vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 72136cd3f3..34267eb270 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -279,7 +279,6 @@ protected:
     RuntimeProfile::Counter* _merge_timer;
     RuntimeProfile::Counter* _serialize_data_timer;
     RuntimeProfile::Counter* _deserialize_data_timer;
-    RuntimeProfile::Counter* _hash_table_size_counter;
     RuntimeProfile::Counter* _max_row_size_counter;
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::Counter* _hash_table_memory_usage;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 5dee84b216..1c9d9864d7 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -33,7 +33,8 @@ AggLocalState::AggLocalState(RuntimeState* state, 
OperatorXBase* parent)
           _serialize_result_timer(nullptr),
           _hash_table_iterate_timer(nullptr),
           _insert_keys_to_column_timer(nullptr),
-          _serialize_data_timer(nullptr) {}
+          _serialize_data_timer(nullptr),
+          _hash_table_size_counter(nullptr) {}
 
 Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
@@ -45,6 +46,7 @@ Status AggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
     _insert_keys_to_column_timer = ADD_TIMER(profile(), 
"InsertKeysToColumnTime");
     _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime");
+    _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", 
TUnit::UNIT);
     auto& p = _parent->cast<AggSourceOperatorX>();
     if (p._without_key) {
         if (p._needs_finalize) {
@@ -527,6 +529,16 @@ Status AggSourceOperatorX::close(RuntimeState* state) {
         local_state._executor.close();
     }
 
+    /// _hash_table_size_counter may be null if prepare failed.
+    if (local_state._hash_table_size_counter) {
+        std::visit(
+                [&](auto&& agg_method) {
+                    COUNTER_SET(local_state._hash_table_size_counter,
+                                int64_t(agg_method.data.size()));
+                },
+                local_state._agg_data->method_variant);
+    }
+
     local_state._shared_state->agg_data = nullptr;
     local_state._shared_state->aggregate_data_container = nullptr;
     local_state._shared_state->agg_arena_pool = nullptr;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 6831a110e6..df99f75023 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -88,6 +88,7 @@ private:
     RuntimeProfile::Counter* _hash_table_iterate_timer;
     RuntimeProfile::Counter* _insert_keys_to_column_timer;
     RuntimeProfile::Counter* _serialize_data_timer;
+    RuntimeProfile::Counter* _hash_table_size_counter;
 
     using vectorized_get_result = std::function<Status(
             RuntimeState* state, vectorized::Block* block, SourceState& 
source_state)>;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 38d8f1df00..114f51071d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -94,6 +94,8 @@ public:
         return _operators[_operators.size() - 1]->row_desc();
     }
 
+    PipelineId id() const { return _pipeline_id; }
+
 private:
     void _init_profile();
     std::atomic<uint32_t> _complete_dependency;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 0cb7d24405..13e89e8766 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -295,21 +295,51 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
 
         _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
 
-        for (int pip_id = _pipelines.size() - 1; pip_id >= 0; pip_id--) {
+        std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
+        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges,
-                                                 
_pipelines[pip_id]->operator_xs().front()->id(),
+                                                 
_pipelines[pip_idx]->operator_xs().front()->id(),
                                                  no_scan_ranges);
 
             auto task = std::make_unique<PipelineXTask>(
-                    _pipelines[pip_id], _total_tasks++, 
_runtime_states[i].get(), this,
-                    _pipelines[pip_id]->pipeline_profile(), scan_ranges, 
local_params.sender_id);
+                    _pipelines[pip_idx], _total_tasks++, 
_runtime_states[i].get(), this,
+                    _pipelines[pip_idx]->pipeline_profile(), scan_ranges, 
local_params.sender_id);
+            pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), 
task.get()});
             RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
-            
_runtime_profile->add_child(_pipelines[pip_id]->pipeline_profile(), true, 
nullptr);
-            if (pip_id < _pipelines.size() - 1) {
-                
task->set_upstream_dependency(_tasks[i].back()->get_downstream_dependency());
-            }
+            
_runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, 
nullptr);
             _tasks[i].emplace_back(std::move(task));
         }
+
+        /**
+         * Build DAG for pipeline tasks.
+         * For example, we have
+         *
+         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
+         *            \                      /
+         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
+         *                 \                          /
+         *               JoinProbeOperator2 (Pipeline1)
+         *
+         * In this fragment, we have three pipelines and pipeline 1 depends on 
pipeline 2 and pipeline 3.
+         * To build this DAG, `_dag` manage dependencies between pipelines by 
pipeline ID and
+         * `pipeline_id_to_task` is used to find the task by a unique pipeline 
ID.
+         *
+         * Finally, we have two upstream dependencies in Pipeline1 
corresponding to JoinProbeOperator1
+         * and JoinProbeOperator2.
+         */
+
+        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+            auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+            DCHECK(task != nullptr);
+
+            if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
+                auto& deps = _dag[_pipelines[pip_idx]->id()];
+                for (auto& dep : deps) {
+                    task->set_upstream_dependency(
+                            
pipeline_id_to_task[dep]->get_downstream_dependency());
+                }
+            }
+        }
     }
     // register the profile of child data stream sender
     //    for (auto& sender : _multi_cast_stream_sink_senders) {
@@ -475,7 +505,12 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                                      
"StreamingAggSourceXOperator"));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
 
+            const auto downstream_pipeline_id = cur_pipe->id();
+            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+                _dag.insert({downstream_pipeline_id, {}});
+            }
             cur_pipe = add_pipeline();
+            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
             DataSinkOperatorXPtr sink;
             sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs));
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
@@ -484,7 +519,13 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             op.reset(new AggSourceOperatorX(pool, tnode, descs, 
"AggSourceXOperator"));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
 
+            const auto downstream_pipeline_id = cur_pipe->id();
+            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+                _dag.insert({downstream_pipeline_id, {}});
+            }
             cur_pipe = add_pipeline();
+            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
             DataSinkOperatorXPtr sink;
             sink.reset(new AggSinkOperatorX(pool, tnode, descs));
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
@@ -496,7 +537,13 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         op.reset(new SortSourceOperatorX(pool, tnode, descs, 
"SortSourceXOperator"));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
 
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
         cur_pipe = add_pipeline();
+        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
         DataSinkOperatorXPtr sink;
         sink.reset(new SortSinkOperatorX(pool, tnode, descs));
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index fd27464491..075b2bc931 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,6 +125,9 @@ private:
     DataSinkOperatorXPtr _sink;
 
     std::atomic_bool _canceled = false;
+
+    // `_dag` manage dependencies between pipelines by pipeline ID
+    std::map<PipelineId, std::vector<PipelineId>> _dag;
 };
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp 
b/be/src/vec/common/sort/vsort_exec_exprs.cpp
index 9b9b91426d..cb3aaa6d65 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.cpp
+++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp
@@ -98,6 +98,7 @@ Status VSortExecExprs::clone(RuntimeState* state, 
VSortExecExprs& new_exprs) {
         RETURN_IF_ERROR(
                 _rhs_ordering_expr_ctxs[i]->clone(state, 
new_exprs._rhs_ordering_expr_ctxs[i]));
     }
+    
new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size());
     for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(_sort_tuple_slot_expr_ctxs[i]->clone(
                 state, new_exprs._sort_tuple_slot_expr_ctxs[i]));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to