github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038954502


##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), 
_name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
+public:
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
+            : OperatorBuilderBase(id, name), 
_node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() = default;
+
+    const RowDescriptor& row_desc() override { return _node->row_desc(); }
+
+    NodeType* exec_node() const { return _node; }
+
+protected:
+    NodeType* _node;
+};
+
+template <typename SinkType>
+class DataSinkOperatorBuilder : public OperatorBuilderBase {
+public:
+    DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* 
sink = nullptr)
+            : OperatorBuilderBase(id, name), 
_sink(reinterpret_cast<SinkType*>(sink)) {}
+
+    virtual ~DataSinkOperatorBuilder() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
   override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
   )) {};override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new 
RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + 
_runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return 
Status::OK(); }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    NodeType* _sink;
+};
 
-    int32_t id() const { return _id; }
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) 
{};
+
+    virtual ~Operator() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
   )) {};override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@ enum class SourceState : uint8_t {
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), 
_name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
+public:
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = 
nullptr)
+            : OperatorBuilderBase(id, name), 
_node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
   override 
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to