Gabriel39 commented on code in PR #24656: URL: https://github.com/apache/doris/pull/24656#discussion_r1331179193
########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,157 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + vectorized::VExprContextSPtrs _output_expr_contexts; +}; +class MultiCastDataStreamerSourceOperatorX final + : public OperatorX<MultiCastDataStreamSourceLocalState> { +public: + using Base = OperatorX<MultiCastDataStreamSourceLocalState>; + MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, + const TDataStreamSink& sink, + const RowDescriptor& row_descriptor, int id) + : Base(pool, id), + _consumer_id(consumer_id), + _t_data_stream_sink(sink), + _row_descriptor(row_descriptor) {}; + ~MultiCastDataStreamerSourceOperatorX() override = default; + Dependency* wait_for_dependency(RuntimeState* state) override { + auto& local_state = + state->get_local_state(id())->cast<MultiCastDataStreamSourceLocalState>(); + if (local_state._shared_state->_multi_cast_data_streamer->can_read(_consumer_id)) { Review Comment: Do this in MultiCastDependency ########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,157 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + vectorized::VExprContextSPtrs _output_expr_contexts; +}; +class MultiCastDataStreamerSourceOperatorX final + : public OperatorX<MultiCastDataStreamSourceLocalState> { +public: + using Base = OperatorX<MultiCastDataStreamSourceLocalState>; + MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, + const TDataStreamSink& sink, + const RowDescriptor& row_descriptor, int id) + : Base(pool, id), + _consumer_id(consumer_id), + _t_data_stream_sink(sink), + _row_descriptor(row_descriptor) {}; + ~MultiCastDataStreamerSourceOperatorX() override = default; + Dependency* wait_for_dependency(RuntimeState* state) override { + auto& local_state = + state->get_local_state(id())->cast<MultiCastDataStreamSourceLocalState>(); + if (local_state._shared_state->_multi_cast_data_streamer->can_read(_consumer_id)) { + return nullptr; + } else { + return local_state._dependency; + } + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + // init profile for runtime filter + // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, _row_desc())); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, + _conjuncts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + } + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); + } + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + } + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } + +private: + friend class MultiCastDataStreamSourceLocalState; + const int _consumer_id; + const TDataStreamSink _t_data_stream_sink; + vectorized::VExprContextSPtrs _output_expr_contexts; + const RowDescriptor& _row_descriptor; + const RowDescriptor& _row_desc() { return _row_descriptor; } +}; + +// sink operator + +class MultiCastDataStreamSinkOperatorX; +class MultiCastDataStreamSinkLocalState final + : public PipelineXSinkLocalState<MultiCastDependency> { + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); + MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer(); + friend class MultiCastDataStreamSinkOperatorX; + friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + using Base = PipelineXSinkLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamSinkOperatorX; +}; +class MultiCastDataStreamSinkOperatorX final + : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> { + using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + +public: + friend class UnionSinkLocalState; + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, + const int cast_sender_count, ObjectPool* pool, + const TMultiCastDataStreamSink& sink, + const RowDescriptor& row_desc) + : Base(sink_id, sources), + _pool(pool), + _row_desc(row_desc), + _cast_sender_count(cast_sender_count) {} + ~MultiCastDataStreamSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { return Status::OK(); } Review Comment: Do not need to override? ########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,157 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + vectorized::VExprContextSPtrs _output_expr_contexts; Review Comment: public variable should not has `_` prefix ########## be/src/pipeline/pipeline_x/operator.cpp: ########## @@ -255,11 +256,43 @@ Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state, } template <typename LocalStateType> -void DataSinkOperatorX<LocalStateType>::get_dependency(DependencySPtr& dependency) { +Status DataSinkOperatorX<LocalStateType>::setup_local_states( + RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) { + DCHECK(infos.size() == 1); + for (auto& info : infos) { + RETURN_IF_ERROR(setup_local_state(state, info)); + } + return Status::OK(); +} + +template <> +Status DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states( + RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) { + auto multi_cast_data_streamer = + static_cast<MultiCastDataStreamSinkOperatorX*>(this)->multi_cast_data_streamer(); + for (auto& info : infos) { + auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state); + state->emplace_sink_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); + local_state->_shared_state->_multi_cast_data_streamer = multi_cast_data_streamer; + } + + return Status::OK(); +} + +template <typename LocalStateType> +void DataSinkOperatorX<LocalStateType>::get_dependency(vector<DependencySPtr>& dependency) { + using DependencyType = typename LocalStateType::Dependency; if constexpr (!std::is_same_v<typename LocalStateType::Dependency, FakeDependency>) { - dependency.reset(new typename LocalStateType::Dependency(dest_id())); + auto& dests = dests_id(); + for (auto& dest_id : dests) { + dependency.push_back(std::make_shared<DependencyType>(dest_id)); + } } else { - dependency.reset((typename LocalStateType::Dependency*)nullptr); + auto& dests = dests_id(); Review Comment: In this pass, dests.size() is always 1 so we can use `dependency.push_back(nullptr);` directly ########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,157 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + vectorized::VExprContextSPtrs _output_expr_contexts; +}; +class MultiCastDataStreamerSourceOperatorX final + : public OperatorX<MultiCastDataStreamSourceLocalState> { +public: + using Base = OperatorX<MultiCastDataStreamSourceLocalState>; + MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, + const TDataStreamSink& sink, + const RowDescriptor& row_descriptor, int id) + : Base(pool, id), + _consumer_id(consumer_id), + _t_data_stream_sink(sink), + _row_descriptor(row_descriptor) {}; + ~MultiCastDataStreamerSourceOperatorX() override = default; + Dependency* wait_for_dependency(RuntimeState* state) override { + auto& local_state = + state->get_local_state(id())->cast<MultiCastDataStreamSourceLocalState>(); + if (local_state._shared_state->_multi_cast_data_streamer->can_read(_consumer_id)) { + return nullptr; + } else { + return local_state._dependency; + } + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + // init profile for runtime filter + // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, _row_desc())); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, + _conjuncts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + } + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); + } + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + } + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } + +private: + friend class MultiCastDataStreamSourceLocalState; + const int _consumer_id; + const TDataStreamSink _t_data_stream_sink; + vectorized::VExprContextSPtrs _output_expr_contexts; + const RowDescriptor& _row_descriptor; + const RowDescriptor& _row_desc() { return _row_descriptor; } +}; + +// sink operator + +class MultiCastDataStreamSinkOperatorX; +class MultiCastDataStreamSinkLocalState final + : public PipelineXSinkLocalState<MultiCastDependency> { + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); + MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer(); + friend class MultiCastDataStreamSinkOperatorX; + friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + using Base = PipelineXSinkLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamSinkOperatorX; +}; Review Comment: Append an empty line ########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,157 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + vectorized::VExprContextSPtrs _output_expr_contexts; +}; +class MultiCastDataStreamerSourceOperatorX final + : public OperatorX<MultiCastDataStreamSourceLocalState> { +public: + using Base = OperatorX<MultiCastDataStreamSourceLocalState>; + MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, + const TDataStreamSink& sink, + const RowDescriptor& row_descriptor, int id) + : Base(pool, id), + _consumer_id(consumer_id), + _t_data_stream_sink(sink), + _row_descriptor(row_descriptor) {}; + ~MultiCastDataStreamerSourceOperatorX() override = default; + Dependency* wait_for_dependency(RuntimeState* state) override { + auto& local_state = + state->get_local_state(id())->cast<MultiCastDataStreamSourceLocalState>(); + if (local_state._shared_state->_multi_cast_data_streamer->can_read(_consumer_id)) { + return nullptr; + } else { + return local_state._dependency; + } + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + // init profile for runtime filter + // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, _row_desc())); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, + _conjuncts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + } + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); + } + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + } + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } + +private: + friend class MultiCastDataStreamSourceLocalState; + const int _consumer_id; + const TDataStreamSink _t_data_stream_sink; + vectorized::VExprContextSPtrs _output_expr_contexts; + const RowDescriptor& _row_descriptor; + const RowDescriptor& _row_desc() { return _row_descriptor; } +}; + +// sink operator + +class MultiCastDataStreamSinkOperatorX; +class MultiCastDataStreamSinkLocalState final + : public PipelineXSinkLocalState<MultiCastDependency> { + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); + MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer(); + friend class MultiCastDataStreamSinkOperatorX; + friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + using Base = PipelineXSinkLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamSinkOperatorX; +}; +class MultiCastDataStreamSinkOperatorX final + : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> { + using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>; + +public: + friend class UnionSinkLocalState; + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, + const int cast_sender_count, ObjectPool* pool, + const TMultiCastDataStreamSink& sink, + const RowDescriptor& row_desc) + : Base(sink_id, sources), + _pool(pool), + _row_desc(row_desc), + _cast_sender_count(cast_sender_count) {} + ~MultiCastDataStreamSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { return Status::OK(); } + + Status open(doris::RuntimeState* state) override { return Status::OK(); }; Review Comment: Do not need to override? ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -115,7 +115,14 @@ class PipelineXTask : public PipelineTask { return false; } - DependencySPtr& get_downstream_dependency() { return _downstream_dependency; } + std::vector<DependencySPtr>& get_downstream_dependency() { return _downstream_dependency; } + + void set_multi_upstream_dependency(std::vector<DependencySPtr>& multi_upstream_dependency) { Review Comment: I think you should modify `set_upstream_dependency` directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org