github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038950499


##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 
'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@ class Operator {
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: expected member name or ';' after declaration specifiers 
[clang-diagnostic-error]
   ```cpp
   )) {};
   ^
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) 
{};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), 
true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == 
SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : 
SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: unknown type name 'E' [clang-diagnostic-error]
   ```cpp
   E) {};
   ^
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) 
{};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), 
true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == 
SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : 
SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 
'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) 
{};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), 
true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == 
SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : 
SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: expected member name or ';' after declaration specifiers 
[clang-diagnostic-error]
   ```cpp
   E) {};
    ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to