This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a7c0dddbc92 [refactor](rename) Rename some variables in pipeline for better readability (#29140) a7c0dddbc92 is described below commit a7c0dddbc9206ed47812ba2fb9bdadac0add067c Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Thu Dec 28 12:54:47 2023 +0800 [refactor](rename) Rename some variables in pipeline for better readability (#29140) * rft-rename * format --- be/src/pipeline/pipeline.cpp | 6 ++-- be/src/pipeline/pipeline.h | 6 ++-- be/src/pipeline/pipeline_fragment_context.cpp | 42 +++++++++++++++------------ 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index d8ac73374f5..7990f84df49 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -71,14 +71,14 @@ Status Pipeline::prepare(RuntimeState* state) { return Status::OK(); } -Status Pipeline::set_sink(OperatorBuilderPtr& sink_) { - if (_sink) { +Status Pipeline::set_sink_builder(OperatorBuilderPtr& sink_) { + if (_sink_builder) { return Status::InternalError("set sink twice"); } if (!sink_->is_sink()) { return Status::InternalError("should set a sink operator but {}", typeid(sink_).name()); } - _sink = sink_; + _sink_builder = sink_; return Status::OK(); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 2775c45019e..ef0acfba258 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -96,10 +96,10 @@ public: // prepare operators for pipelineX Status prepare(RuntimeState* state); - Status set_sink(OperatorBuilderPtr& sink_operator); + Status set_sink_builder(OperatorBuilderPtr& sink_operator_builder); Status set_sink(DataSinkOperatorXPtr& sink_operator); - OperatorBuilderBase* sink() { return _sink.get(); } + OperatorBuilderBase* get_sink_builder() { return _sink_builder.get(); } DataSinkOperatorXBase* sink_x() { return _sink_x.get(); } OperatorXs& operator_xs() { return operatorXs; } DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; } @@ -185,7 +185,7 @@ private: void _init_profile(); OperatorBuilders _operator_builders; // left is _source, right is _root - OperatorBuilderPtr _sink; // put block to sink + OperatorBuilderPtr _sink_builder; // put block to sink std::mutex _depend_mutex; std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 034fbaf2951..695fd6f4d3d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -35,6 +35,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" #include "exec/data_sink.h" #include "exec/exec_node.h" #include "exec/scan_node.h" @@ -332,6 +333,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->set_num_local_sink(request.num_local_sink); if (request.fragment.__isset.output_sink) { + // Here we build a DataSink object, which will be hold by DataSinkOperator RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, request, idx, _root_plan->row_desc(), @@ -343,6 +345,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _root_pipeline->set_collect_query_statistics_with_every_batch(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { + // DataSinkOperator is builded here RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id, request.fragment.output_sink, _runtime_state.get())); } @@ -366,14 +369,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks( _total_tasks = 0; for (PipelinePtr& pipeline : _pipelines) { // if sink - auto sink = pipeline->sink()->build_operator(); + auto sink_operator = pipeline->get_sink_builder()->build_operator(); // TODO pipeline 1 need to add new interface for exec node and operator - static_cast<void>(sink->init(request.fragment.output_sink)); + RETURN_IF_ERROR(sink_operator->init(request.fragment.output_sink)); RETURN_IF_ERROR(pipeline->build_operators()); - auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(), - sink, this, pipeline->pipeline_profile()); - static_cast<void>(sink->set_child(task->get_root())); + auto task = + std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(), + sink_operator, this, pipeline->pipeline_profile()); + RETURN_IF_ERROR(sink_operator->set_child(task->get_root())); _tasks.emplace_back(std::move(task)); _runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr); } @@ -524,7 +528,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); OperatorBuilderPtr child_sink_builder = std::make_shared<UnionSinkOperatorBuilder>( union_node->id(), child_id, union_node, data_queue); - RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder)); + RETURN_IF_ERROR(new_child_pipeline->set_sink_builder(child_sink_builder)); } OperatorBuilderPtr source_builder = std::make_shared<UnionSourceOperatorBuilder>( node->id(), union_node, data_queue); @@ -541,7 +545,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr pre_agg_sink = std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node, data_queue); - RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); + RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink)); OperatorBuilderPtr pre_agg_source = std::make_shared<DistinctStreamingAggSourceOperatorBuilder>( @@ -551,7 +555,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto data_queue = std::make_shared<DataQueue>(1); OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>( node->id(), agg_node, data_queue); - RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); + RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink)); OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>( node->id(), agg_node, data_queue); @@ -559,7 +563,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } else { OperatorBuilderPtr agg_sink = std::make_shared<AggSinkOperatorBuilder>(node->id(), agg_node); - RETURN_IF_ERROR(new_pipe->set_sink(agg_sink)); + RETURN_IF_ERROR(new_pipe->set_sink_builder(agg_sink)); OperatorBuilderPtr agg_source = std::make_shared<AggSourceOperatorBuilder>(node->id(), agg_node); @@ -572,7 +576,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); OperatorBuilderPtr sort_sink = std::make_shared<SortSinkOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink)); + RETURN_IF_ERROR(new_pipeline->set_sink_builder(sort_sink)); OperatorBuilderPtr sort_source = std::make_shared<SortSourceOperatorBuilder>(node->id(), node); @@ -585,7 +589,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr partition_sort_sink = std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink)); + RETURN_IF_ERROR(new_pipeline->set_sink_builder(partition_sort_sink)); OperatorBuilderPtr partition_sort_source = std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node); @@ -598,7 +602,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr analytic_sink = std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink)); + RETURN_IF_ERROR(new_pipeline->set_sink_builder(analytic_sink)); OperatorBuilderPtr analytic_source = std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), node); @@ -637,7 +641,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } OperatorBuilderPtr join_sink = std::make_shared<HashJoinBuildSinkBuilder>(node->id(), join_node); - RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); + RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink)); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr join_source = @@ -652,7 +656,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); OperatorBuilderPtr join_sink = std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); + RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink)); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr join_source = @@ -690,7 +694,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline)); OperatorBuilderPtr sink_builder = std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node); - RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder)); + RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder)); for (int child_id = 1; child_id < node->children_count(); ++child_id) { auto probe_pipeline = add_pipeline(); @@ -698,7 +702,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode OperatorBuilderPtr probe_sink_builder = std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id, node); - RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder)); + RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder)); } OperatorBuilderPtr source_builder = @@ -827,7 +831,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { sink_ = std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(), _sink.get()); - RETURN_IF_ERROR(_root_pipeline->set_sink(sink_)); + RETURN_IF_ERROR(_root_pipeline->set_sink_builder(sink_)); auto& multi_cast_data_streamer = assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get()) @@ -862,7 +866,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr // 3. create and set sink operator of data stream sender for new pipeline OperatorBuilderPtr sink_op_builder = std::make_shared<ExchangeSinkOperatorBuilder>( next_operator_builder_id(), _multi_cast_stream_sink_senders[i].get(), i); - static_cast<void>(new_pipeline->set_sink(sink_op_builder)); + static_cast<void>(new_pipeline->set_sink_builder(sink_op_builder)); // 4. init and prepare the data_stream_sender of diff exchange TDataSink t; @@ -876,7 +880,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr default: return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type); } - return _root_pipeline->set_sink(sink_); + return _root_pipeline->set_sink_builder(sink_); } // If all pipeline tasks binded to the fragment instance are finished, then we could --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org