Gabriel39 commented on code in PR #25140: URL: https://github.com/apache/doris/pull/25140#discussion_r1351229718
########## be/src/pipeline/exec/set_source_operator.h: ########## @@ -52,5 +53,67 @@ class SetSourceOperator : public SourceOperator<SetSourceOperatorBuilder<is_inte Status open(RuntimeState* /*state*/) override { return Status::OK(); } }; +template <bool is_intersect> +class SetSourceOperatorX; + +template <bool is_intersect> +class SetSourceLocalState final : public PipelineXLocalState<SetDependency> { +public: + ENABLE_FACTORY_CREATOR(SetSourceLocalState); + using Base = PipelineXLocalState<SetDependency>; + using Parent = SetSourceOperatorX<is_intersect>; + SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override { + _pull_timer = ADD_TIMER(profile(), "PullTime"); + return Status::OK(); + } + +private: + friend class SetSourceOperatorX<is_intersect>; + friend class OperatorX<SetSourceLocalState>; + RuntimeProfile::Counter* _pull_timer; // time to pull data +}; + +template <bool is_intersect> +class SetSourceOperatorX final : public OperatorX<SetSourceLocalState<is_intersect>> { +public: + using Base = OperatorX<SetSourceLocalState<is_intersect>>; + // for non-delay tempalte instantiation + using OperatorXBase::id; + using typename Base::LocalState; + + SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : Base(pool, tnode, descs) {}; + ~SetSourceOperatorX() override = default; + + Dependency* wait_for_dependency(RuntimeState* state) override { + CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state._dependency->read_blocked_by(); + } + + [[nodiscard]] bool is_source() const override { return true; } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + +private: + friend class SetSourceLocalState<is_intersect>; + + void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state, + vectorized::Block* output_block); + + template <typename HashTableContext> + Status get_data_in_hashtable(SetSourceLocalState<is_intersect>& local_state, + HashTableContext& hash_table_ctx, vectorized::Block* output_block, + const int batch_size, SourceState& source_state); + + void add_result_columns(SetSourceLocalState<is_intersect>& local_state, + vectorized::RowRefListWithFlags& value, int& block_size); + + using Base::_conjuncts; + std::vector<vectorized::MutableColumnPtr> _mutable_cols; Review Comment: Mutable variables should be hold in local state instead of this global operator. ########## be/src/pipeline/exec/set_source_operator.h: ########## @@ -52,5 +53,67 @@ class SetSourceOperator : public SourceOperator<SetSourceOperatorBuilder<is_inte Status open(RuntimeState* /*state*/) override { return Status::OK(); } }; +template <bool is_intersect> +class SetSourceOperatorX; + +template <bool is_intersect> +class SetSourceLocalState final : public PipelineXLocalState<SetDependency> { +public: + ENABLE_FACTORY_CREATOR(SetSourceLocalState); + using Base = PipelineXLocalState<SetDependency>; + using Parent = SetSourceOperatorX<is_intersect>; + SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override { + _pull_timer = ADD_TIMER(profile(), "PullTime"); + return Status::OK(); + } + +private: + friend class SetSourceOperatorX<is_intersect>; + friend class OperatorX<SetSourceLocalState>; + RuntimeProfile::Counter* _pull_timer; // time to pull data +}; + +template <bool is_intersect> +class SetSourceOperatorX final : public OperatorX<SetSourceLocalState<is_intersect>> { +public: + using Base = OperatorX<SetSourceLocalState<is_intersect>>; + // for non-delay tempalte instantiation + using OperatorXBase::id; + using typename Base::LocalState; + + SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : Base(pool, tnode, descs) {}; + ~SetSourceOperatorX() override = default; + + Dependency* wait_for_dependency(RuntimeState* state) override { + CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state._dependency->read_blocked_by(); + } + + [[nodiscard]] bool is_source() const override { return true; } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + +private: + friend class SetSourceLocalState<is_intersect>; + + void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state, Review Comment: Private function should have a name with prefix `_` ########## be/src/pipeline/exec/set_source_operator.cpp: ########## @@ -48,4 +48,116 @@ template class SetSourceOperatorBuilder<false>; template class SetSourceOperator<true>; template class SetSourceOperator<false>; +template <bool is_intersect> +Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + RETURN_IF_CANCELLED(state); + CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state._pull_timer); Review Comment: We should also count this time into `total_timer` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org