This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3838b6fbaeb [refine](pipelineX) refine some code in pipelineX (#27472)
3838b6fbaeb is described below

commit 3838b6fbaeb2034b0bebb04cf605951c23a2e5ed
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Mon Nov 27 11:04:16 2023 +0800

    [refine](pipelineX) refine some code in pipelineX (#27472)
---
 be/src/pipeline/pipeline_x/operator.cpp            |  5 --
 be/src/pipeline/pipeline_x/operator.h              |  1 -
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 56 ++++++++++++----------
 .../pipeline_x/pipeline_x_fragment_context.h       | 12 +++--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 20 ++++----
 be/src/runtime/runtime_state.cpp                   | 11 +++--
 be/src/runtime/runtime_state.h                     |  2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |  6 ---
 be/src/vec/sink/writer/async_result_writer.h       |  2 -
 9 files changed, 54 insertions(+), 61 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index c0f3a6f029a..6f305d86efa 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -528,11 +528,6 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* 
state, vectorized::Bl
     return _writer->sink(block, source_state == SourceState::FINISHED);
 }
 
-template <typename Writer, typename Parent>
-Dependency* AsyncWriterSink<Writer, Parent>::write_blocked_by(PipelineXTask* 
task) {
-    return _writer->write_blocked_by(task);
-}
-
 template <typename Writer, typename Parent>
 Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status 
exec_status) {
     if (_closed) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 28f156a15a9..2dc71dec96a 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -640,7 +640,6 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state);
 
-    Dependency* write_blocked_by(PipelineXTask* task);
     Dependency* dependency() override { return _async_writer_dependency.get(); 
}
     Status close(RuntimeState* state, Status exec_status) override;
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 3d7b8ea42b9..6a40f86dede 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -261,7 +261,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
                 params.__isset.send_query_statistics_with_every_batch
                         ? params.send_query_statistics_with_every_batch
                         : false;
-        _sink.reset(new ExchangeSinkOperatorX(state, row_desc, 
next_operator_id(),
+        _sink.reset(new ExchangeSinkOperatorX(state, row_desc, 
next_sink_operator_id(),
                                               thrift_sink.stream_sink, 
params.destinations,
                                               
send_query_statistics_with_every_batch));
         break;
@@ -272,18 +272,18 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         }
 
         // TODO: figure out good buffer size based on size of output row
-        _sink.reset(new ResultSinkOperatorX(next_operator_id(), row_desc, 
output_exprs,
+        _sink.reset(new ResultSinkOperatorX(next_sink_operator_id(), row_desc, 
output_exprs,
                                             thrift_sink.result_sink));
         break;
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), 
row_desc,
+            _sink.reset(new OlapTableSinkV2OperatorX(pool, 
next_sink_operator_id(), row_desc,
                                                      output_exprs, false));
         } else {
-            _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), 
row_desc, output_exprs,
-                                                   false));
+            _sink.reset(new OlapTableSinkOperatorX(pool, 
next_sink_operator_id(), row_desc,
+                                                   output_exprs, false));
         }
         break;
     }
@@ -292,7 +292,8 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
             return Status::InternalError("Missing data jdbc sink.");
         }
         if (config::enable_java_support) {
-            _sink.reset(new JdbcTableSinkOperatorX(row_desc, 
next_operator_id(), output_exprs));
+            _sink.reset(
+                    new JdbcTableSinkOperatorX(row_desc, 
next_sink_operator_id(), output_exprs));
         } else {
             return Status::InternalError(
                     "Jdbc table sink is not enabled, you can change be config "
@@ -313,10 +314,12 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             _sink.reset(new ResultFileSinkOperatorX(
-                    next_operator_id(), row_desc, 
thrift_sink.result_file_sink, params.destinations,
-                    send_query_statistics_with_every_batch, output_exprs, 
desc_tbl));
+                    next_sink_operator_id(), row_desc, 
thrift_sink.result_file_sink,
+                    params.destinations, 
send_query_statistics_with_every_batch, output_exprs,
+                    desc_tbl));
         } else {
-            _sink.reset(new ResultFileSinkOperatorX(next_operator_id(), 
row_desc, output_exprs));
+            _sink.reset(
+                    new ResultFileSinkOperatorX(next_sink_operator_id(), 
row_desc, output_exprs));
         }
         break;
     }
@@ -324,7 +327,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
         DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
         // TODO: figure out good buffer size based on size of output row
-        auto sink_id = next_operator_id();
+        auto sink_id = next_sink_operator_id();
         auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
         // one sink has multiple sources.
         std::vector<int> sources;
@@ -359,7 +362,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
 
             DataSinkOperatorXPtr sink_op;
             sink_op.reset(new ExchangeSinkOperatorX(
-                    state, *_row_desc, next_operator_id(),
+                    state, *_row_desc, next_sink_operator_id(),
                     thrift_sink.multi_cast_stream_sink.sinks[i],
                     thrift_sink.multi_cast_stream_sink.destinations[i], 
false));
 
@@ -421,7 +424,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         _runtime_states[i]->set_desc_tbl(_desc_tbl);
         
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
         
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
-        _runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
+        _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
         
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
         _runtime_states[i]->set_total_load_streams(request.total_load_streams);
         _runtime_states[i]->set_num_local_sink(request.num_local_sink);
@@ -675,8 +678,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             cur_pipe = add_pipeline();
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
             DataSinkOperatorXPtr sink;
-            sink.reset(
-                    new DistinctStreamingAggSinkOperatorX(pool, 
next_operator_id(), tnode, descs));
+            sink.reset(new DistinctStreamingAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode,
+                                                             descs));
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -692,7 +695,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             cur_pipe = add_pipeline();
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
             DataSinkOperatorXPtr sink;
-            sink.reset(new StreamingAggSinkOperatorX(pool, next_operator_id(), 
tnode, descs));
+            sink.reset(new StreamingAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -708,7 +711,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
             DataSinkOperatorXPtr sink;
-            sink.reset(new AggSinkOperatorX<>(pool, next_operator_id(), tnode, 
descs));
+            sink.reset(new AggSinkOperatorX<>(pool, next_sink_operator_id(), 
tnode, descs));
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -733,7 +736,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new HashJoinBuildSinkOperatorX(pool, next_operator_id(), 
tnode, descs));
+        sink.reset(new HashJoinBuildSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -753,7 +756,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, 
next_operator_id(), tnode, descs));
+        sink.reset(
+                new NestedLoopJoinBuildSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -774,7 +778,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             PipelinePtr build_side_pipe = add_pipeline();
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
             DataSinkOperatorXPtr sink;
-            sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool, 
tnode, descs));
+            sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), 
pool, tnode, descs));
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
             RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
@@ -795,7 +799,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new SortSinkOperatorX(pool, next_operator_id(), tnode, 
descs));
+        sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, 
descs));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -813,7 +817,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new PartitionSortSinkOperatorX(pool, next_operator_id(), 
tnode, descs));
+        sink.reset(new PartitionSortSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -831,7 +835,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
-        sink.reset(new AnalyticSinkOperatorX(pool, next_operator_id(), tnode, 
descs));
+        sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), 
tnode, descs));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(cur_pipe->set_sink(sink));
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -912,11 +916,11 @@ Status 
PipelineXFragmentContext::_build_operators_for_set_operation_node(
 
         DataSinkOperatorXPtr sink;
         if (child_id == 0) {
-            sink.reset(new SetSinkOperatorX<is_intersect>(child_id, 
next_operator_id(), pool, tnode,
-                                                          descs));
+            sink.reset(new SetSinkOperatorX<is_intersect>(child_id, 
next_sink_operator_id(), pool,
+                                                          tnode, descs));
         } else {
-            sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, 
next_operator_id(), pool,
-                                                               tnode, descs));
+            sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, 
next_sink_operator_id(),
+                                                               pool, tnode, 
descs));
         }
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 23ff08fcb0a..f579265ab63 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -104,7 +104,7 @@ public:
 
     RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
         std::lock_guard<std::mutex> l(_state_map_lock);
-        if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
+        if (_instance_id_to_runtime_state.contains(fragment_instance_id)) {
             return _instance_id_to_runtime_state[fragment_instance_id];
         } else {
             return _runtime_state.get();
@@ -115,6 +115,10 @@ public:
 
     [[nodiscard]] int max_operator_id() const { return _operator_id; }
 
+    [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; }
+
+    [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; 
}
+
     std::string debug_string() override;
 
 private:
@@ -203,11 +207,9 @@ private:
 
     std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
     std::mutex _state_map_lock;
-    // We can guarantee that a plan node ID can correspond to an operator ID,
-    // but some operators do not have a corresponding plan node ID.
-    // We set these IDs as negative numbers, which are not visible to the user.
-    int _operator_id = 0;
 
+    int _operator_id = 0;
+    int _sink_operator_id = 0;
     std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> 
_op_id_to_le_state;
 };
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index aaf9c2a16f1..c96f34b213b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -83,19 +83,15 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     std::vector<TScanRangeParams> no_scan_ranges;
     auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
                                          _operators.front()->node_id(), 
no_scan_ranges);
-
+    auto* parent_profile = _parent_profile;
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
-        auto& deps = 
get_upstream_dependency(_operators[op_idx]->operator_id());
-        LocalStateInfo info {
-                op_idx == _operators.size() - 1
-                        ? _parent_profile
-                        : state->get_local_state(_operators[op_idx + 
1]->operator_id())->profile(),
-                scan_ranges,
-                deps,
-                _local_exchange_state,
-                _task_idx,
-                _source_dependency[_operators[op_idx]->operator_id()]};
-        RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info));
+        auto& op = _operators[op_idx];
+        auto& deps = get_upstream_dependency(op->operator_id());
+        LocalStateInfo info {parent_profile, scan_ranges,
+                             deps,           _local_exchange_state,
+                             _task_idx,      
_source_dependency[op->operator_id()]};
+        RETURN_IF_ERROR(op->setup_local_state(state, info));
+        parent_profile = state->get_local_state(op->operator_id())->profile();
     }
 
     _block = doris::vectorized::Block::create_unique();
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index daf3e1bc75e..a8027d0d61b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -23,6 +23,7 @@
 #include <fmt/format.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
 
 #include <string>
 
@@ -424,13 +425,15 @@ int64_t RuntimeState::get_load_mem_limit() {
     }
 }
 
-void RuntimeState::resize_op_id_to_local_state(int size) {
-    _op_id_to_local_state.resize(size);
-    _op_id_to_sink_local_state.resize(size);
+void RuntimeState::resize_op_id_to_local_state(int operator_size, int 
sink_size) {
+    _op_id_to_local_state.resize(operator_size);
+    _op_id_to_sink_local_state.resize(sink_size);
 }
 
 void RuntimeState::emplace_local_state(
         int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> 
state) {
+    DCHECK(id < _op_id_to_local_state.size());
+    DCHECK(!_op_id_to_local_state[id]);
     _op_id_to_local_state[id] = std::move(state);
 }
 
@@ -451,6 +454,8 @@ Result<RuntimeState::LocalState*> 
RuntimeState::get_local_state_result(int id) {
 
 void RuntimeState::emplace_sink_local_state(
         int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> 
state) {
+    DCHECK(id < _op_id_to_sink_local_state.size());
+    DCHECK(!_op_id_to_sink_local_state[id]);
     _op_id_to_sink_local_state[id] = std::move(state);
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 835bd582894..8c7b3bed9fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -500,7 +500,7 @@ public:
 
     Result<SinkLocalState*> get_sink_local_state_result(int id);
 
-    void resize_op_id_to_local_state(int size);
+    void resize_op_id_to_local_state(int operator_size, int sink_size);
 
 private:
     Status create_error_log_file();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 8edde60adb4..422dc2efef4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -172,11 +172,5 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
     return b;
 }
 
-pipeline::Dependency* 
AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) {
-    std::lock_guard l(_m);
-    DCHECK(_dependency != nullptr);
-    return _dependency->is_blocked_by(task);
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 0a217b34e6b..e91ff1a0701 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -78,8 +78,6 @@ public:
         return _data_queue_is_available() || _is_finished();
     }
 
-    pipeline::Dependency* write_blocked_by(pipeline::PipelineXTask* task);
-
     [[nodiscard]] bool is_pending_finish() const { return 
!_writer_thread_closed; }
 
     void process_block(RuntimeState* state, RuntimeProfile* profile);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to