Gabriel39 commented on code in PR #25140:
URL: https://github.com/apache/doris/pull/25140#discussion_r1351229718


##########
be/src/pipeline/exec/set_source_operator.h:
##########
@@ -52,5 +53,67 @@ class SetSourceOperator : public 
SourceOperator<SetSourceOperatorBuilder<is_inte
     Status open(RuntimeState* /*state*/) override { return Status::OK(); }
 };
 
+template <bool is_intersect>
+class SetSourceOperatorX;
+
+template <bool is_intersect>
+class SetSourceLocalState final : public PipelineXLocalState<SetDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(SetSourceLocalState);
+    using Base = PipelineXLocalState<SetDependency>;
+    using Parent = SetSourceOperatorX<is_intersect>;
+    SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override {
+        _pull_timer = ADD_TIMER(profile(), "PullTime");
+        return Status::OK();
+    }
+
+private:
+    friend class SetSourceOperatorX<is_intersect>;
+    friend class OperatorX<SetSourceLocalState>;
+    RuntimeProfile::Counter* _pull_timer; // time to pull data
+};
+
+template <bool is_intersect>
+class SetSourceOperatorX final : public 
OperatorX<SetSourceLocalState<is_intersect>> {
+public:
+    using Base = OperatorX<SetSourceLocalState<is_intersect>>;
+    // for non-delay tempalte instantiation
+    using OperatorXBase::id;
+    using typename Base::LocalState;
+
+    SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
+            : Base(pool, tnode, descs) {};
+    ~SetSourceOperatorX() override = default;
+
+    Dependency* wait_for_dependency(RuntimeState* state) override {
+        CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+        return local_state._dependency->read_blocked_by();
+    }
+
+    [[nodiscard]] bool is_source() const override { return true; }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+private:
+    friend class SetSourceLocalState<is_intersect>;
+
+    void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state,
+                             vectorized::Block* output_block);
+
+    template <typename HashTableContext>
+    Status get_data_in_hashtable(SetSourceLocalState<is_intersect>& 
local_state,
+                                 HashTableContext& hash_table_ctx, 
vectorized::Block* output_block,
+                                 const int batch_size, SourceState& 
source_state);
+
+    void add_result_columns(SetSourceLocalState<is_intersect>& local_state,
+                            vectorized::RowRefListWithFlags& value, int& 
block_size);
+
+    using Base::_conjuncts;
+    std::vector<vectorized::MutableColumnPtr> _mutable_cols;

Review Comment:
    Mutable variables should be hold in local state instead of this global 
operator.



##########
be/src/pipeline/exec/set_source_operator.h:
##########
@@ -52,5 +53,67 @@ class SetSourceOperator : public 
SourceOperator<SetSourceOperatorBuilder<is_inte
     Status open(RuntimeState* /*state*/) override { return Status::OK(); }
 };
 
+template <bool is_intersect>
+class SetSourceOperatorX;
+
+template <bool is_intersect>
+class SetSourceLocalState final : public PipelineXLocalState<SetDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(SetSourceLocalState);
+    using Base = PipelineXLocalState<SetDependency>;
+    using Parent = SetSourceOperatorX<is_intersect>;
+    SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override {
+        _pull_timer = ADD_TIMER(profile(), "PullTime");
+        return Status::OK();
+    }
+
+private:
+    friend class SetSourceOperatorX<is_intersect>;
+    friend class OperatorX<SetSourceLocalState>;
+    RuntimeProfile::Counter* _pull_timer; // time to pull data
+};
+
+template <bool is_intersect>
+class SetSourceOperatorX final : public 
OperatorX<SetSourceLocalState<is_intersect>> {
+public:
+    using Base = OperatorX<SetSourceLocalState<is_intersect>>;
+    // for non-delay tempalte instantiation
+    using OperatorXBase::id;
+    using typename Base::LocalState;
+
+    SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
+            : Base(pool, tnode, descs) {};
+    ~SetSourceOperatorX() override = default;
+
+    Dependency* wait_for_dependency(RuntimeState* state) override {
+        CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+        return local_state._dependency->read_blocked_by();
+    }
+
+    [[nodiscard]] bool is_source() const override { return true; }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+private:
+    friend class SetSourceLocalState<is_intersect>;
+
+    void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state,

Review Comment:
   Private function should have a name with prefix `_`



##########
be/src/pipeline/exec/set_source_operator.cpp:
##########
@@ -48,4 +48,116 @@ template class SetSourceOperatorBuilder<false>;
 template class SetSourceOperator<true>;
 template class SetSourceOperator<false>;
 
+template <bool is_intersect>
+Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                                   SourceState& source_state) {
+    RETURN_IF_CANCELLED(state);
+    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    SCOPED_TIMER(local_state._pull_timer);

Review Comment:
   We should also count this time into `total_timer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to