This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a7c0dddbc92 [refactor](rename) Rename some variables in pipeline for 
better readability (#29140)
a7c0dddbc92 is described below

commit a7c0dddbc9206ed47812ba2fb9bdadac0add067c
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Thu Dec 28 12:54:47 2023 +0800

    [refactor](rename) Rename some variables in pipeline for better readability 
(#29140)
    
    * rft-rename
    
    * format
---
 be/src/pipeline/pipeline.cpp                  |  6 ++--
 be/src/pipeline/pipeline.h                    |  6 ++--
 be/src/pipeline/pipeline_fragment_context.cpp | 42 +++++++++++++++------------
 3 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index d8ac73374f5..7990f84df49 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -71,14 +71,14 @@ Status Pipeline::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status Pipeline::set_sink(OperatorBuilderPtr& sink_) {
-    if (_sink) {
+Status Pipeline::set_sink_builder(OperatorBuilderPtr& sink_) {
+    if (_sink_builder) {
         return Status::InternalError("set sink twice");
     }
     if (!sink_->is_sink()) {
         return Status::InternalError("should set a sink operator but {}", 
typeid(sink_).name());
     }
-    _sink = sink_;
+    _sink_builder = sink_;
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 2775c45019e..ef0acfba258 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -96,10 +96,10 @@ public:
     // prepare operators for pipelineX
     Status prepare(RuntimeState* state);
 
-    Status set_sink(OperatorBuilderPtr& sink_operator);
+    Status set_sink_builder(OperatorBuilderPtr& sink_operator_builder);
     Status set_sink(DataSinkOperatorXPtr& sink_operator);
 
-    OperatorBuilderBase* sink() { return _sink.get(); }
+    OperatorBuilderBase* get_sink_builder() { return _sink_builder.get(); }
     DataSinkOperatorXBase* sink_x() { return _sink_x.get(); }
     OperatorXs& operator_xs() { return operatorXs; }
     DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
@@ -185,7 +185,7 @@ private:
     void _init_profile();
 
     OperatorBuilders _operator_builders; // left is _source, right is _root
-    OperatorBuilderPtr _sink;            // put block to sink
+    OperatorBuilderPtr _sink_builder;    // put block to sink
 
     std::mutex _depend_mutex;
     std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 034fbaf2951..695fd6f4d3d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -35,6 +35,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "exec/data_sink.h"
 #include "exec/exec_node.h"
 #include "exec/scan_node.h"
@@ -332,6 +333,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state->set_num_local_sink(request.num_local_sink);
 
     if (request.fragment.__isset.output_sink) {
+        // Here we build a DataSink object, which will be hold by 
DataSinkOperator
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
                 _runtime_state->obj_pool(), request.fragment.output_sink,
                 request.fragment.output_exprs, request, idx, 
_root_plan->row_desc(),
@@ -343,6 +345,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _root_pipeline->set_collect_query_statistics_with_every_batch();
     RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
     if (_sink) {
+        // DataSinkOperator is builded here
         RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
                                      request.fragment.output_sink, 
_runtime_state.get()));
     }
@@ -366,14 +369,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     _total_tasks = 0;
     for (PipelinePtr& pipeline : _pipelines) {
         // if sink
-        auto sink = pipeline->sink()->build_operator();
+        auto sink_operator = pipeline->get_sink_builder()->build_operator();
         // TODO pipeline 1 need to add new interface for exec node and operator
-        static_cast<void>(sink->init(request.fragment.output_sink));
+        RETURN_IF_ERROR(sink_operator->init(request.fragment.output_sink));
 
         RETURN_IF_ERROR(pipeline->build_operators());
-        auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
-                                                   sink, this, 
pipeline->pipeline_profile());
-        static_cast<void>(sink->set_child(task->get_root()));
+        auto task =
+                std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
+                                               sink_operator, this, 
pipeline->pipeline_profile());
+        RETURN_IF_ERROR(sink_operator->set_child(task->get_root()));
         _tasks.emplace_back(std::move(task));
         _runtime_profile->add_child(pipeline->pipeline_profile(), true, 
nullptr);
     }
@@ -524,7 +528,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
                 RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), 
new_child_pipeline));
                 OperatorBuilderPtr child_sink_builder = 
std::make_shared<UnionSinkOperatorBuilder>(
                         union_node->id(), child_id, union_node, data_queue);
-                
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
+                
RETURN_IF_ERROR(new_child_pipeline->set_sink_builder(child_sink_builder));
             }
             OperatorBuilderPtr source_builder = 
std::make_shared<UnionSourceOperatorBuilder>(
                     node->id(), union_node, data_queue);
@@ -541,7 +545,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
             OperatorBuilderPtr pre_agg_sink =
                     
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
                                                                               
data_queue);
-            RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+            RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
 
             OperatorBuilderPtr pre_agg_source =
                     
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
@@ -551,7 +555,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
-            RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+            RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
 
             OperatorBuilderPtr pre_agg_source = 
std::make_shared<StreamingAggSourceOperatorBuilder>(
                     node->id(), agg_node, data_queue);
@@ -559,7 +563,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         } else {
             OperatorBuilderPtr agg_sink =
                     std::make_shared<AggSinkOperatorBuilder>(node->id(), 
agg_node);
-            RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
+            RETURN_IF_ERROR(new_pipe->set_sink_builder(agg_sink));
 
             OperatorBuilderPtr agg_source =
                     std::make_shared<AggSourceOperatorBuilder>(node->id(), 
agg_node);
@@ -572,7 +576,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
 
         OperatorBuilderPtr sort_sink = 
std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
-        RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
+        RETURN_IF_ERROR(new_pipeline->set_sink_builder(sort_sink));
 
         OperatorBuilderPtr sort_source =
                 std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
@@ -585,7 +589,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
 
         OperatorBuilderPtr partition_sort_sink =
                 std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), 
node);
-        RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
+        RETURN_IF_ERROR(new_pipeline->set_sink_builder(partition_sort_sink));
 
         OperatorBuilderPtr partition_sort_source =
                 
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
@@ -598,7 +602,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
 
         OperatorBuilderPtr analytic_sink =
                 std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), 
node);
-        RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
+        RETURN_IF_ERROR(new_pipeline->set_sink_builder(analytic_sink));
 
         OperatorBuilderPtr analytic_source =
                 std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), 
node);
@@ -637,7 +641,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         }
         OperatorBuilderPtr join_sink =
                 std::make_shared<HashJoinBuildSinkBuilder>(node->id(), 
join_node);
-        RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
+        RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
 
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr join_source =
@@ -652,7 +656,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
         OperatorBuilderPtr join_sink =
                 std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), 
node);
-        RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
+        RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
 
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr join_source =
@@ -690,7 +694,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
     RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
     OperatorBuilderPtr sink_builder =
             std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), 
node);
-    RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
+    RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder));
 
     for (int child_id = 1; child_id < node->children_count(); ++child_id) {
         auto probe_pipeline = add_pipeline();
@@ -698,7 +702,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
         OperatorBuilderPtr probe_sink_builder =
                 
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), 
child_id,
                                                                             
node);
-        RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
+        RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder));
     }
 
     OperatorBuilderPtr source_builder =
@@ -827,7 +831,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
         sink_ = 
std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(),
                                                                          
_sink.get());
-        RETURN_IF_ERROR(_root_pipeline->set_sink(sink_));
+        RETURN_IF_ERROR(_root_pipeline->set_sink_builder(sink_));
 
         auto& multi_cast_data_streamer =
                 assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get())
@@ -862,7 +866,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
             // 3. create and set sink operator of data stream sender for new 
pipeline
             OperatorBuilderPtr sink_op_builder = 
std::make_shared<ExchangeSinkOperatorBuilder>(
                     next_operator_builder_id(), 
_multi_cast_stream_sink_senders[i].get(), i);
-            static_cast<void>(new_pipeline->set_sink(sink_op_builder));
+            static_cast<void>(new_pipeline->set_sink_builder(sink_op_builder));
 
             // 4. init and prepare the data_stream_sender of diff exchange
             TDataSink t;
@@ -876,7 +880,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     default:
         return Status::InternalError("Unsuported sink type in pipeline: {}", 
thrift_sink.type);
     }
-    return _root_pipeline->set_sink(sink_);
+    return _root_pipeline->set_sink_builder(sink_);
 }
 
 // If all pipeline tasks binded to the fragment instance are finished, then we 
could


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to