This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d80e7dc2fe [Improvement](pipelineX) Improve local exchange on 
pipelineX engine (#26464)
5d80e7dc2fe is described below

commit 5d80e7dc2fe7a9cc80cc26244146d5dda8070908
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 7 22:11:44 2023 +0800

    [Improvement](pipelineX) Improve local exchange on pipelineX engine (#26464)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 16 ++++-----
 be/src/pipeline/exec/exchange_sink_operator.h   | 29 +++++++++++++---
 be/src/runtime/runtime_state.h                  |  4 +++
 be/src/vec/runtime/vdata_stream_recvr.cpp       | 46 +++++++++++++++++--------
 be/src/vec/runtime/vdata_stream_recvr.h         | 22 +++++++-----
 be/src/vec/sink/vdata_stream_sender.h           | 12 +++----
 6 files changed, 85 insertions(+), 44 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b5cceb5a291..e6bcccbabb2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -192,15 +192,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                 ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", 
timer_name);
     } else if (local_size > 0) {
         size_t dep_id = 0;
-        _channels_dependency.resize(local_size);
+        _local_channels_dependency.resize(local_size);
         _wait_channel_timer.resize(local_size);
         auto deps_for_channels = 
AndDependency::create_shared(_parent->operator_id());
         for (auto channel : channels) {
             if (channel->is_local()) {
-                _channels_dependency[dep_id] =
-                        
ChannelDependency::create_shared(_parent->operator_id());
-                channel->set_dependency(_channels_dependency[dep_id]);
-                deps_for_channels->add_child(_channels_dependency[dep_id]);
+                _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
+                DCHECK(_local_channels_dependency[dep_id] != nullptr);
+                
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
                 _wait_channel_timer[dep_id] =
                         ADD_CHILD_TIMER(_profile, 
"WaitForLocalExchangeBuffer", timer_name);
                 dep_id++;
@@ -428,7 +427,8 @@ Status 
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
     {
         SCOPED_TIMER(state.serialize_batch_timer());
         dest->Clear();
-        size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        size_t uncompressed_bytes = 0;
+        size_t compressed_bytes = 0;
         RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, 
&uncompressed_bytes,
                                        &compressed_bytes, _compression_type,
                                        _transfer_large_data_by_brpc));
@@ -521,9 +521,9 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         COUNTER_UPDATE(_wait_broadcast_buffer_timer,
                        _broadcast_dependency->write_watcher_elapse_time());
     }
-    for (size_t i = 0; i < _channels_dependency.size(); i++) {
+    for (size_t i = 0; i < _local_channels_dependency.size(); i++) {
         COUNTER_UPDATE(_wait_channel_timer[i],
-                       _channels_dependency[i]->write_watcher_elapse_time());
+                       
_local_channels_dependency[i]->write_watcher_elapse_time());
     }
     _sink_buffer->update_profile(profile());
     _sink_buffer->close();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 815d3930577..2b42d28958a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -113,11 +113,30 @@ private:
     std::atomic<int> _available_block;
 };
 
-class ChannelDependency final : public WriteDependency {
+/**
+ * We use this to control the execution for local exchange.
+ *              +---------------+                                    
+---------------+                               +---------------+
+ *              | ExchangeSink1 |                                    | 
ExchangeSink2 |                               | ExchangeSink3 |
+ *              +---------------+                                    
+---------------+                               +---------------+
+ *                     |                                                    |  
                                             |
+ *                     |                       
+----------------------------+----------------------------------+            |
+ *                     
+----+------------------|------------------------------------------+            
        |            |
+ *                          |                  |                 
+------------------------|--------------------|------------+-----+
+ *          Dependency 1-1  |   Dependency 2-1 |  Dependency 3-1 |         
Dependency 1-2 |    Dependency 2-2  |  Dependency 3-2  |
+ *                    +----------------------------------------------+         
      +----------------------------------------------+
+ *                    |  queue1              queue2          queue3  |         
      |  queue1              queue2          queue3  |
+ *                    |                   LocalRecvr                 |         
      |                   LocalRecvr                 |
+ *                    +----------------------------------------------+         
      +----------------------------------------------+
+ *                         +-----------------+                                 
                       +------------------+
+ *                         | ExchangeSource1 |                                 
                       | ExchangeSource2 |
+ *                         +-----------------+                                 
                       +------------------+
+ */
+class LocalExchangeChannelDependency final : public WriteDependency {
 public:
-    ENABLE_FACTORY_CREATOR(ChannelDependency);
-    ChannelDependency(int id) : WriteDependency(id, "ChannelDependency") {}
-    ~ChannelDependency() override = default;
+    ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
+    LocalExchangeChannelDependency(int id)
+            : WriteDependency(id, "LocalExchangeChannelDependency") {}
+    ~LocalExchangeChannelDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
@@ -209,7 +228,7 @@ private:
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
     std::shared_ptr<AndDependency> _exchange_sink_dependency = nullptr;
     std::shared_ptr<BroadcastDependency> _broadcast_dependency = nullptr;
-    std::vector<std::shared_ptr<ChannelDependency>> _channels_dependency;
+    std::vector<std::shared_ptr<LocalExchangeChannelDependency>> 
_local_channels_dependency;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     int _partition_count;
 };
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1c20725cd33..805951e34e3 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -329,6 +329,10 @@ public:
         return _query_options.__isset.enable_pipeline_engine &&
                _query_options.enable_pipeline_engine;
     }
+    bool enable_pipeline_x_exec() const {
+        return _query_options.__isset.enable_pipeline_x_engine &&
+               _query_options.enable_pipeline_x_engine;
+    }
     bool enable_local_shuffle() const {
         return _query_options.__isset.enable_local_shuffle && 
_query_options.enable_local_shuffle;
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index b842cbeea48..9384d4abbfc 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -100,7 +100,12 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
     _recvr->update_blocks_memory_usage(-block_byte_size);
     _block_queue.pop_front();
     if (_block_queue.size() == 0 && _dependency) {
-        _dependency->block_reading();
+        if (!_is_cancelled && _num_remaining_senders > 0) {
+            _dependency->block_reading();
+        }
+        for (auto& it : _local_channel_dependency) {
+            it->set_ready_for_write();
+        }
     }
 
     if (!_pending_closures.empty()) {
@@ -350,6 +355,14 @@ VDataStreamRecvr::VDataStreamRecvr(
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
+    if (state->enable_pipeline_x_exec()) {
+        _sender_to_local_channel_dependency.resize(num_senders);
+        for (size_t i = 0; i < num_senders; i++) {
+            _sender_to_local_channel_dependency[i] =
+                    
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id);
+        }
+    }
+
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
     _sender_queues.reserve(num_queues);
@@ -358,6 +371,16 @@ VDataStreamRecvr::VDataStreamRecvr(
         SenderQueue* queue = nullptr;
         if (_enable_pipeline) {
             queue = _sender_queue_pool.add(new PipSenderQueue(this, 
num_sender_per_queue, profile));
+            if (state->enable_pipeline_x_exec()) {
+                auto dependencies =
+                        is_merging
+                                ? std::vector<std::shared_ptr<
+                                          pipeline::
+                                                  
LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency
+                                                                               
             [i]}
+                                : _sender_to_local_channel_dependency;
+                queue->set_local_channel_dependency(dependencies);
+            }
         } else {
             queue = _sender_queue_pool.add(new SenderQueue(this, 
num_sender_per_queue, profile));
         }
@@ -424,11 +447,11 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
     return _sender_queues[use_sender_id]->queue_empty();
 }
 
-void 
VDataStreamRecvr::set_dependency(std::shared_ptr<pipeline::ChannelDependency> 
dependency) {
-    _dependency = dependency;
-    for (auto& queue : _sender_queues) {
-        queue->set_channel_dependency(dependency);
-    }
+std::shared_ptr<pipeline::LocalExchangeChannelDependency>
+VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
+    DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id);
+    DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr);
+    return _sender_to_local_channel_dependency[sender_id];
 }
 
 bool VDataStreamRecvr::ready_to_read() {
@@ -482,13 +505,6 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
 void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
     _blocks_memory_usage->add(size);
     _blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
-    if (_dependency && size > 0 &&
-        _blocks_memory_usage_current_value > 
config::exchg_node_buffer_size_bytes && !_is_closed) {
-        _dependency->block_writing();
-    } else if (_dependency && size < 0 &&
-               _blocks_memory_usage_current_value <= 
config::exchg_node_buffer_size_bytes) {
-        _dependency->set_ready_for_write();
-    }
 }
 
 void VDataStreamRecvr::close() {
@@ -496,8 +512,8 @@ void VDataStreamRecvr::close() {
         return;
     }
     _is_closed = true;
-    if (_dependency) {
-        _dependency->set_ready_for_write();
+    for (auto& it : _sender_to_local_channel_dependency) {
+        it->set_ready_for_write();
     }
     for (int i = 0; i < _sender_queues.size(); ++i) {
         _sender_queues[i]->close();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index b2fd6afdc7f..2f5c88301e2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -59,7 +59,7 @@ class RuntimeState;
 
 namespace pipeline {
 struct ExchangeDataDependency;
-class ChannelDependency;
+class LocalExchangeChannelDependency;
 } // namespace pipeline
 
 namespace vectorized {
@@ -125,7 +125,8 @@ public:
 
     bool is_closed() const { return _is_closed; }
 
-    void set_dependency(std::shared_ptr<pipeline::ChannelDependency> 
dependency);
+    std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
get_local_channel_dependency(
+            int sender_id);
 
 private:
     void update_blocks_memory_usage(int64_t size);
@@ -183,7 +184,8 @@ private:
     std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
     bool _enable_pipeline;
-    std::shared_ptr<pipeline::ChannelDependency> _dependency;
+    std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
+            _sender_to_local_channel_dependency;
 };
 
 class ThreadClosure : public google::protobuf::Closure {
@@ -201,6 +203,12 @@ public:
 
     virtual ~SenderQueue();
 
+    void set_local_channel_dependency(
+            
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>&
+                    local_channel_dependency) {
+        _local_channel_dependency = local_channel_dependency;
+    }
+
     virtual bool should_wait();
 
     virtual Status get_batch(Block* next_block, bool* eos);
@@ -225,10 +233,6 @@ public:
         _dependency = dependency;
     }
 
-    void set_channel_dependency(std::shared_ptr<pipeline::ChannelDependency> 
channel_dependency) {
-        _channel_dependency = channel_dependency;
-    }
-
 protected:
     Status _inner_get_batch_without_lock(Block* block, bool* eos);
 
@@ -251,7 +255,8 @@ protected:
     std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> 
_local_closure;
 
     std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr;
-    std::shared_ptr<pipeline::ChannelDependency> _channel_dependency = nullptr;
+    std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
+            _local_channel_dependency;
 };
 
 class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
@@ -270,5 +275,6 @@ public:
 
     void add_block(Block* block, bool use_move) override;
 };
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index a09cb4b7d47..30a55f13296 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -65,7 +65,7 @@ enum CompressionTypePB : int;
 namespace pipeline {
 class ExchangeSinkOperator;
 class ExchangeSinkOperatorX;
-class ChannelDependency;
+class LocalExchangeChannelDependency;
 } // namespace pipeline
 
 namespace vectorized {
@@ -310,11 +310,6 @@ public:
 
     bool is_local() const { return _is_local; }
 
-    VDataStreamRecvr* local_recvr() {
-        DCHECK(_is_local && _local_recvr != nullptr);
-        return _local_recvr.get();
-    }
-
     virtual void ch_roll_pb_block();
 
     bool can_write() {
@@ -557,11 +552,12 @@ public:
         return _closure.get();
     }
 
-    void set_dependency(std::shared_ptr<pipeline::ChannelDependency> 
dependency) {
+    std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
get_local_channel_dependency() {
         if (!Channel<Parent>::_local_recvr) {
             throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
         }
-        Channel<Parent>::_local_recvr->set_dependency(dependency);
+        return Channel<Parent>::_local_recvr->get_local_channel_dependency(
+                Channel<Parent>::_parent->sender_id());
     }
 
 private:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to