This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd582aee75a [pipelineX](minor) refine code (#25015) bd582aee75a is described below commit bd582aee75adc5d82e65280dca18b731ecf08755 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Oct 7 10:45:33 2023 +0800 [pipelineX](minor) refine code (#25015) --- be/src/pipeline/exec/analytic_sink_operator.cpp | 2 + be/src/pipeline/exec/analytic_sink_operator.h | 2 +- ...ream_sink.h => multi_cast_data_stream_sink.cpp} | 31 +++------ be/src/pipeline/exec/multi_cast_data_stream_sink.h | 74 +++++++++++++++++++++- .../exec/multi_cast_data_stream_source.cpp | 12 ++-- .../pipeline/exec/multi_cast_data_stream_source.h | 71 +-------------------- be/src/pipeline/exec/union_source_operator.cpp | 2 +- be/src/pipeline/exec/union_source_operator.h | 2 +- be/src/pipeline/pipeline_x/dependency.h | 6 +- be/src/pipeline/pipeline_x/operator.cpp | 19 ++---- be/src/pipeline/pipeline_x/operator.h | 4 -- .../pipeline_x/pipeline_x_fragment_context.cpp | 1 + 12 files changed, 104 insertions(+), 122 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 2b35e1b6a2c..d839be0dc16 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -215,4 +215,6 @@ Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, return Status::OK(); } +template class DataSinkOperatorX<AnalyticSinkLocalState>; + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 41d276205be..c8583925865 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,4 +102,4 @@ private: }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp similarity index 57% copy from be/src/pipeline/exec/multi_cast_data_stream_sink.h copy to be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index e137a7e6558..b44f15d13e9 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -15,33 +15,20 @@ // specific language governing permissions and limitations // under the License. -#pragma once - -#include "operator.h" -#include "vec/sink/multi_cast_data_stream_sink.h" +#include "multi_cast_data_stream_sink.h" namespace doris::pipeline { -class MultiCastDataStreamSinkOperatorBuilder final - : public DataSinkOperatorBuilder<vectorized::MultiCastDataStreamSink> { -public: - MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink) - : DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator", sink) {} - - OperatorPtr build_operator() override; -}; - -class MultiCastDataStreamSinkOperator final - : public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> { -public: - MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) - : DataSinkOperator(operator_builder, sink) {} - - bool can_write() override { return true; } -}; - OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink); } +Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + auto& p = _parent->cast<MultiCastDataStreamSinkOperatorX>(); + _shared_state->multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>( + p._row_desc, p._pool, p._cast_sender_count); + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index e137a7e6558..f949b624c7b 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -18,6 +18,7 @@ #pragma once #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/multi_cast_data_stream_sink.h" namespace doris::pipeline { @@ -40,8 +41,75 @@ public: bool can_write() override { return true; } }; -OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { - return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink); -} +class MultiCastDataStreamSinkOperatorX; +class MultiCastDataStreamSinkLocalState final + : public PipelineXSinkLocalState<MultiCastDependency> { + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); + MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + friend class MultiCastDataStreamSinkOperatorX; + friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + using Base = PipelineXSinkLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamSinkOperatorX; + +private: + std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer; +}; + +class MultiCastDataStreamSinkOperatorX final + : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> { + using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + +public: + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, + const int cast_sender_count, ObjectPool* pool, + const TMultiCastDataStreamSink& sink, + const RowDescriptor& row_desc) + : Base(sink_id, sources), + _pool(pool), + _row_desc(row_desc), + _cast_sender_count(cast_sender_count) {} + ~MultiCastDataStreamSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { return Status::OK(); } + + Status open(doris::RuntimeState* state) override { return Status::OK(); }; + + Status prepare(RuntimeState* state) override { return Status::OK(); } + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + auto st = local_state._shared_state->multi_cast_data_streamer->push( + state, in_block, source_state == SourceState::FINISHED); + // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished + if (st.template is<ErrorCode::END_OF_FILE>()) { + return Status::OK(); + } + return st; + } + return Status::OK(); + } + + RowDescriptor& row_desc() override { return _row_desc; } + + std::shared_ptr<pipeline::MultiCastDataStreamer> create_multi_cast_data_streamer() { + auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>( + _row_desc, _pool, _cast_sender_count); + return multi_cast_data_streamer; + } + +private: + friend class MultiCastDataStreamSinkLocalState; + ObjectPool* _pool; + RowDescriptor _row_desc; + int _cast_sender_count; + friend class MultiCastDataStreamSinkLocalState; +}; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c0e7b146594..c70d87f59e0 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -130,11 +130,9 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<Parent>(); - if (p._t_data_stream_sink.__isset.output_exprs) { - _output_expr_contexts.resize(p._output_expr_contexts.size()); - for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { - RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); - } + _output_expr_contexts.resize(p._output_expr_contexts.size()); + for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { + RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } return Status::OK(); } @@ -150,7 +148,7 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, if (!local_state._output_expr_contexts.empty()) { output_block = &tmp_block; } - local_state._shared_state->_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos); + local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, output_block, &eos); if (!local_state._conjuncts.empty()) { RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, @@ -162,9 +160,11 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, local_state._output_expr_contexts, *output_block, block)); materialize_block_inplace(*block); } + COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); if (eos) { source_state = SourceState::FINISHED; } return Status::OK(); } + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index aa20272d07b..3d2b8157fa7 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -108,6 +108,7 @@ public: private: vectorized::VExprContextSPtrs _output_expr_contexts; }; + class MultiCastDataStreamerSourceOperatorX final : public OperatorX<MultiCastDataStreamSourceLocalState> { public: @@ -169,73 +170,5 @@ private: const RowDescriptor& _row_desc() { return _row_descriptor; } }; -// sink operator - -class MultiCastDataStreamSinkOperatorX; -class MultiCastDataStreamSinkLocalState final - : public PipelineXSinkLocalState<MultiCastDependency> { - ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); - MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state) {} - friend class MultiCastDataStreamSinkOperatorX; - friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; - using Base = PipelineXSinkLocalState<MultiCastDependency>; - using Parent = MultiCastDataStreamSinkOperatorX; -}; - -class MultiCastDataStreamSinkOperatorX final - : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> { - using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; - -public: - friend class UnionSinkLocalState; - MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, - const int cast_sender_count, ObjectPool* pool, - const TMultiCastDataStreamSink& sink, - const RowDescriptor& row_desc) - : Base(sink_id, sources), - _pool(pool), - _row_desc(row_desc), - _cast_sender_count(cast_sender_count) {} - ~MultiCastDataStreamSinkOperatorX() override = default; - Status init(const TDataSink& tsink) override { return Status::OK(); } - - Status open(doris::RuntimeState* state) override { return Status::OK(); }; - - Status prepare(RuntimeState* state) override { return Status::OK(); } - - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { - CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - auto st = local_state._shared_state->_multi_cast_data_streamer->push( - state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is<ErrorCode::END_OF_FILE>()) { - return Status::OK(); - } - return st; - } - return Status::OK(); - } - - std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer() { - auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>( - _row_desc, _pool, _cast_sender_count); - return multi_cast_data_streamer; - } - - RowDescriptor& row_desc() override { return _row_desc; } - -private: - ObjectPool* _pool; - RowDescriptor _row_desc; - int _cast_sender_count; - friend class MultiCastDataStreamSinkLocalState; -}; - } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 67a8ac6d4af..54a9603e7f6 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -124,7 +124,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -std::shared_ptr<DataQueue> UnionSourceLocalState::data_queue() { +std::shared_ptr<DataQueue> UnionSourceLocalState::create_data_queue() { auto& p = _parent->cast<Parent>(); std::shared_ptr<DataQueue> data_queue = std::make_shared<DataQueue>(p._child_size, _dependency); return data_queue; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index a22a0e8c0a2..d02176fc8de 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -78,7 +78,7 @@ public: UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; Status init(RuntimeState* state, LocalStateInfo& info) override; - std::shared_ptr<DataQueue> data_queue(); + std::shared_ptr<DataQueue> create_data_queue(); private: friend class UnionSourceOperatorX; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 2ea0992bebc..c69b49870d5 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -111,7 +111,7 @@ protected: class WriteDependency : public Dependency { public: WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(true) {} - virtual ~WriteDependency() = default; + ~WriteDependency() override = default; bool is_write_dependency() override { return true; } @@ -428,7 +428,7 @@ private: struct MultiCastSharedState { public: - std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer; + std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer; }; class MultiCastDependency final : public WriteDependency { @@ -438,7 +438,7 @@ public: ~MultiCastDependency() override = default; void* shared_state() override { return (void*)&_multi_cast_state; }; MultiCastDependency* can_read(const int consumer_id) { - if (_multi_cast_state._multi_cast_data_streamer->can_read(consumer_id)) { + if (_multi_cast_state.multi_cast_data_streamer->can_read(consumer_id)) { return nullptr; } else { return this; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index c6331b04fbe..81bee3063de 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -37,6 +37,7 @@ #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" +#include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -257,20 +258,14 @@ Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -template <typename LocalStateType> -Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state, - LocalSinkStateInfo& info) { - auto local_state = LocalStateType::create_shared(this, state); - state->emplace_sink_local_state(id(), local_state); - return local_state->init(state, info); -} - template <typename LocalStateType> Status DataSinkOperatorX<LocalStateType>::setup_local_states( RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) { DCHECK(infos.size() == 1); for (auto& info : infos) { - RETURN_IF_ERROR(setup_local_state(state, info)); + auto local_state = LocalStateType::create_shared(this, state); + state->emplace_sink_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); } return Status::OK(); } @@ -279,12 +274,12 @@ template <> Status DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states( RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) { auto multi_cast_data_streamer = - static_cast<MultiCastDataStreamSinkOperatorX*>(this)->multi_cast_data_streamer(); + static_cast<MultiCastDataStreamSinkOperatorX*>(this)->create_multi_cast_data_streamer(); for (auto& info : infos) { auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state); state->emplace_sink_local_state(id(), local_state); RETURN_IF_ERROR(local_state->init(state, info)); - local_state->_shared_state->_multi_cast_data_streamer = multi_cast_data_streamer; + local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer; } return Status::OK(); @@ -331,7 +326,7 @@ Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state, RETURN_IF_ERROR(local_state->init(state, info)); if (child_count != 0) { if (!data_queue) { - data_queue = local_state->data_queue(); + data_queue = local_state->create_data_queue(); } local_state->_shared_state->data_queue = data_queue; } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 53a294412a2..149d28265e3 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -419,8 +419,6 @@ public: Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } - virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; - virtual Status setup_local_states(RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) = 0; @@ -529,8 +527,6 @@ public: : DataSinkOperatorXBase(id, sources) {} ~DataSinkOperatorX() override = default; - Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; - Status setup_local_states(RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) override; void get_dependency(std::vector<DependencySPtr>& dependency) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 4dedbce44f1..85a9527eeb6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -60,6 +60,7 @@ #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" +#include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org