This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd582aee75a [pipelineX](minor) refine code (#25015)
bd582aee75a is described below

commit bd582aee75adc5d82e65280dca18b731ecf08755
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sat Oct 7 10:45:33 2023 +0800

    [pipelineX](minor) refine code (#25015)
---
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 ...ream_sink.h => multi_cast_data_stream_sink.cpp} | 31 +++------
 be/src/pipeline/exec/multi_cast_data_stream_sink.h | 74 +++++++++++++++++++++-
 .../exec/multi_cast_data_stream_source.cpp         | 12 ++--
 .../pipeline/exec/multi_cast_data_stream_source.h  | 71 +--------------------
 be/src/pipeline/exec/union_source_operator.cpp     |  2 +-
 be/src/pipeline/exec/union_source_operator.h       |  2 +-
 be/src/pipeline/pipeline_x/dependency.h            |  6 +-
 be/src/pipeline/pipeline_x/operator.cpp            | 19 ++----
 be/src/pipeline/pipeline_x/operator.h              |  4 --
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  1 +
 12 files changed, 104 insertions(+), 122 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 2b35e1b6a2c..d839be0dc16 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -215,4 +215,6 @@ Status 
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
     return Status::OK();
 }
 
+template class DataSinkOperatorX<AnalyticSinkLocalState>;
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 41d276205be..c8583925865 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -102,4 +102,4 @@ private:
 };
 
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
similarity index 57%
copy from be/src/pipeline/exec/multi_cast_data_stream_sink.h
copy to be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index e137a7e6558..b44f15d13e9 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -15,33 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
-
-#include "operator.h"
-#include "vec/sink/multi_cast_data_stream_sink.h"
+#include "multi_cast_data_stream_sink.h"
 
 namespace doris::pipeline {
 
-class MultiCastDataStreamSinkOperatorBuilder final
-        : public DataSinkOperatorBuilder<vectorized::MultiCastDataStreamSink> {
-public:
-    MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink)
-            : DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator", 
sink) {}
-
-    OperatorPtr build_operator() override;
-};
-
-class MultiCastDataStreamSinkOperator final
-        : public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> {
-public:
-    MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, 
DataSink* sink)
-            : DataSinkOperator(operator_builder, sink) {}
-
-    bool can_write() override { return true; }
-};
-
 OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
     return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
 }
 
+Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    auto& p = _parent->cast<MultiCastDataStreamSinkOperatorX>();
+    _shared_state->multi_cast_data_streamer = 
std::make_shared<pipeline::MultiCastDataStreamer>(
+            p._row_desc, p._pool, p._cast_sender_count);
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index e137a7e6558..f949b624c7b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/sink/multi_cast_data_stream_sink.h"
 
 namespace doris::pipeline {
@@ -40,8 +41,75 @@ public:
     bool can_write() override { return true; }
 };
 
-OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
-    return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
-}
+class MultiCastDataStreamSinkOperatorX;
+class MultiCastDataStreamSinkLocalState final
+        : public PipelineXSinkLocalState<MultiCastDependency> {
+    ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
+    MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
+            : Base(parent, state) {}
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class MultiCastDataStreamSinkOperatorX;
+    friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
+    using Base = PipelineXSinkLocalState<MultiCastDependency>;
+    using Parent = MultiCastDataStreamSinkOperatorX;
+
+private:
+    std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
+};
+
+class MultiCastDataStreamSinkOperatorX final
+        : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
+    using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
+
+public:
+    MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
+                                     const int cast_sender_count, ObjectPool* 
pool,
+                                     const TMultiCastDataStreamSink& sink,
+                                     const RowDescriptor& row_desc)
+            : Base(sink_id, sources),
+              _pool(pool),
+              _row_desc(row_desc),
+              _cast_sender_count(cast_sender_count) {}
+    ~MultiCastDataStreamSinkOperatorX() override = default;
+    Status init(const TDataSink& tsink) override { return Status::OK(); }
+
+    Status open(doris::RuntimeState* state) override { return Status::OK(); };
+
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override {
+        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        SCOPED_TIMER(local_state.profile()->total_time_counter());
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+        if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
+            COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+            auto st = 
local_state._shared_state->multi_cast_data_streamer->push(
+                    state, in_block, source_state == SourceState::FINISHED);
+            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
+            if (st.template is<ErrorCode::END_OF_FILE>()) {
+                return Status::OK();
+            }
+            return st;
+        }
+        return Status::OK();
+    }
+
+    RowDescriptor& row_desc() override { return _row_desc; }
+
+    std::shared_ptr<pipeline::MultiCastDataStreamer> 
create_multi_cast_data_streamer() {
+        auto multi_cast_data_streamer = 
std::make_shared<pipeline::MultiCastDataStreamer>(
+                _row_desc, _pool, _cast_sender_count);
+        return multi_cast_data_streamer;
+    }
+
+private:
+    friend class MultiCastDataStreamSinkLocalState;
+    ObjectPool* _pool;
+    RowDescriptor _row_desc;
+    int _cast_sender_count;
+    friend class MultiCastDataStreamSinkLocalState;
+};
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c0e7b146594..c70d87f59e0 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -130,11 +130,9 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
-    if (p._t_data_stream_sink.__isset.output_exprs) {
-        _output_expr_contexts.resize(p._output_expr_contexts.size());
-        for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
-            RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
-        }
+    _output_expr_contexts.resize(p._output_expr_contexts.size());
+    for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
+        RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
     }
     return Status::OK();
 }
@@ -150,7 +148,7 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
     if (!local_state._output_expr_contexts.empty()) {
         output_block = &tmp_block;
     }
-    local_state._shared_state->_multi_cast_data_streamer->pull(_consumer_id, 
output_block, &eos);
+    local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, 
output_block, &eos);
 
     if (!local_state._conjuncts.empty()) {
         
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
@@ -162,9 +160,11 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                 local_state._output_expr_contexts, *output_block, block));
         materialize_block_inplace(*block);
     }
+    COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
     if (eos) {
         source_state = SourceState::FINISHED;
     }
     return Status::OK();
 }
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index aa20272d07b..3d2b8157fa7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -108,6 +108,7 @@ public:
 private:
     vectorized::VExprContextSPtrs _output_expr_contexts;
 };
+
 class MultiCastDataStreamerSourceOperatorX final
         : public OperatorX<MultiCastDataStreamSourceLocalState> {
 public:
@@ -169,73 +170,5 @@ private:
     const RowDescriptor& _row_desc() { return _row_descriptor; }
 };
 
-// sink operator
-
-class MultiCastDataStreamSinkOperatorX;
-class MultiCastDataStreamSinkLocalState final
-        : public PipelineXSinkLocalState<MultiCastDependency> {
-    ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
-    MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
-            : Base(parent, state) {}
-    friend class MultiCastDataStreamSinkOperatorX;
-    friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
-    using Base = PipelineXSinkLocalState<MultiCastDependency>;
-    using Parent = MultiCastDataStreamSinkOperatorX;
-};
-
-class MultiCastDataStreamSinkOperatorX final
-        : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
-    using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
-
-public:
-    friend class UnionSinkLocalState;
-    MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
-                                     const int cast_sender_count, ObjectPool* 
pool,
-                                     const TMultiCastDataStreamSink& sink,
-                                     const RowDescriptor& row_desc)
-            : Base(sink_id, sources),
-              _pool(pool),
-              _row_desc(row_desc),
-              _cast_sender_count(cast_sender_count) {}
-    ~MultiCastDataStreamSinkOperatorX() override = default;
-    Status init(const TDataSink& tsink) override { return Status::OK(); }
-
-    Status open(doris::RuntimeState* state) override { return Status::OK(); };
-
-    Status prepare(RuntimeState* state) override { return Status::OK(); }
-
-    Status sink(RuntimeState* state, vectorized::Block* in_block,
-                SourceState source_state) override {
-        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
-        SCOPED_TIMER(local_state.profile()->total_time_counter());
-        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-        if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
-            COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-            auto st = 
local_state._shared_state->_multi_cast_data_streamer->push(
-                    state, in_block, source_state == SourceState::FINISHED);
-            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
-            if (st.template is<ErrorCode::END_OF_FILE>()) {
-                return Status::OK();
-            }
-            return st;
-        }
-        return Status::OK();
-    }
-
-    std::shared_ptr<pipeline::MultiCastDataStreamer> 
multi_cast_data_streamer() {
-        auto multi_cast_data_streamer = 
std::make_shared<pipeline::MultiCastDataStreamer>(
-                _row_desc, _pool, _cast_sender_count);
-        return multi_cast_data_streamer;
-    }
-
-    RowDescriptor& row_desc() override { return _row_desc; }
-
-private:
-    ObjectPool* _pool;
-    RowDescriptor _row_desc;
-    int _cast_sender_count;
-    friend class MultiCastDataStreamSinkLocalState;
-};
-
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 67a8ac6d4af..54a9603e7f6 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -124,7 +124,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     return Status::OK();
 }
 
-std::shared_ptr<DataQueue> UnionSourceLocalState::data_queue() {
+std::shared_ptr<DataQueue> UnionSourceLocalState::create_data_queue() {
     auto& p = _parent->cast<Parent>();
     std::shared_ptr<DataQueue> data_queue = 
std::make_shared<DataQueue>(p._child_size, _dependency);
     return data_queue;
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index a22a0e8c0a2..d02176fc8de 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -78,7 +78,7 @@ public:
     UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
-    std::shared_ptr<DataQueue> data_queue();
+    std::shared_ptr<DataQueue> create_data_queue();
 
 private:
     friend class UnionSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 2ea0992bebc..c69b49870d5 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -111,7 +111,7 @@ protected:
 class WriteDependency : public Dependency {
 public:
     WriteDependency(int id, std::string name) : Dependency(id, name), 
_ready_for_write(true) {}
-    virtual ~WriteDependency() = default;
+    ~WriteDependency() override = default;
 
     bool is_write_dependency() override { return true; }
 
@@ -428,7 +428,7 @@ private:
 
 struct MultiCastSharedState {
 public:
-    std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
+    std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
 };
 
 class MultiCastDependency final : public WriteDependency {
@@ -438,7 +438,7 @@ public:
     ~MultiCastDependency() override = default;
     void* shared_state() override { return (void*)&_multi_cast_state; };
     MultiCastDependency* can_read(const int consumer_id) {
-        if 
(_multi_cast_state._multi_cast_data_streamer->can_read(consumer_id)) {
+        if (_multi_cast_state.multi_cast_data_streamer->can_read(consumer_id)) 
{
             return nullptr;
         } else {
             return this;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index c6331b04fbe..81bee3063de 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -37,6 +37,7 @@
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
+#include "pipeline/exec/multi_cast_data_stream_sink.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -257,20 +258,14 @@ Status DataSinkOperatorXBase::init(const TPlanNode& 
tnode, RuntimeState* state)
     return Status::OK();
 }
 
-template <typename LocalStateType>
-Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* 
state,
-                                                            
LocalSinkStateInfo& info) {
-    auto local_state = LocalStateType::create_shared(this, state);
-    state->emplace_sink_local_state(id(), local_state);
-    return local_state->init(state, info);
-}
-
 template <typename LocalStateType>
 Status DataSinkOperatorX<LocalStateType>::setup_local_states(
         RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
     DCHECK(infos.size() == 1);
     for (auto& info : infos) {
-        RETURN_IF_ERROR(setup_local_state(state, info));
+        auto local_state = LocalStateType::create_shared(this, state);
+        state->emplace_sink_local_state(id(), local_state);
+        RETURN_IF_ERROR(local_state->init(state, info));
     }
     return Status::OK();
 }
@@ -279,12 +274,12 @@ template <>
 Status 
DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states(
         RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
     auto multi_cast_data_streamer =
-            
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->multi_cast_data_streamer();
+            
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->create_multi_cast_data_streamer();
     for (auto& info : infos) {
         auto local_state = 
MultiCastDataStreamSinkLocalState::create_shared(this, state);
         state->emplace_sink_local_state(id(), local_state);
         RETURN_IF_ERROR(local_state->init(state, info));
-        local_state->_shared_state->_multi_cast_data_streamer = 
multi_cast_data_streamer;
+        local_state->_shared_state->multi_cast_data_streamer = 
multi_cast_data_streamer;
     }
 
     return Status::OK();
@@ -331,7 +326,7 @@ Status 
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
         RETURN_IF_ERROR(local_state->init(state, info));
         if (child_count != 0) {
             if (!data_queue) {
-                data_queue = local_state->data_queue();
+                data_queue = local_state->create_data_queue();
             }
             local_state->_shared_state->data_queue = data_queue;
         }
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 53a294412a2..149d28265e3 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -419,8 +419,6 @@ public:
     Status prepare(RuntimeState* state) override { return Status::OK(); }
     Status open(RuntimeState* state) override { return Status::OK(); }
 
-    virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& 
info) = 0;
-
     virtual Status setup_local_states(RuntimeState* state,
                                       std::vector<LocalSinkStateInfo>& infos) 
= 0;
 
@@ -529,8 +527,6 @@ public:
             : DataSinkOperatorXBase(id, sources) {}
     ~DataSinkOperatorX() override = default;
 
-    Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
-
     Status setup_local_states(RuntimeState* state, 
std::vector<LocalSinkStateInfo>& infos) override;
     void get_dependency(std::vector<DependencySPtr>& dependency) override;
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 4dedbce44f1..85a9527eeb6 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -60,6 +60,7 @@
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
+#include "pipeline/exec/multi_cast_data_stream_sink.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to