This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b185dfcbf67 [pick](branch-2.1) pick #41676 #41740 #41857 (#41904)
b185dfcbf67 is described below

commit b185dfcbf6782cd4d2b45c795161ad0a92c4841e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Oct 15 22:41:17 2024 +0800

    [pick](branch-2.1) pick #41676 #41740 #41857 (#41904)
    
    pick #41676 #41740 #41857
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 45 +++++++++++++++-------
 be/src/pipeline/exec/exchange_sink_operator.h      |  1 +
 be/src/pipeline/exec/result_file_sink_operator.cpp |  4 +-
 .../local_exchange_sink_operator.cpp               | 14 +++++--
 .../local_exchange/local_exchange_sink_operator.h  |  1 +
 be/src/vec/sink/vdata_stream_sender.cpp            | 14 +++----
 be/src/vec/sink/vdata_stream_sender.h              |  2 +-
 7 files changed, 55 insertions(+), 26 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 7584c0b0e45..ada5d5455b0 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     _part_type = p._part_type;
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
+    if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
+        _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
+        std::random_device rd;
+        std::mt19937 g(rd());
+        shuffle(channels.begin(), channels.end(), g);
+    }
     int local_size = 0;
     for (int i = 0; i < channels.size(); ++i) {
         RETURN_IF_ERROR(channels[i]->open(state));
         if (channels[i]->is_local()) {
             local_size++;
+            _last_local_channel_idx = i;
         }
     }
-    if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
-        _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
-        std::random_device rd;
-        std::mt19937 g(rd());
-        shuffle(channels.begin(), channels.end(), g);
-    }
     only_local_exchange = local_size == channels.size();
 
     PUniqueId id;
@@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         if (local_state.only_local_exchange) {
             if (!block->empty()) {
                 Status status;
+                size_t idx = 0;
                 for (auto* channel : local_state.channels) {
                     if (!channel->is_receiver_eof()) {
-                        status = channel->send_local_block(block);
+                        // If this channel is the last, we can move this block 
to downstream pipeline.
+                        // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
+                        DCHECK_GE(local_state._last_local_channel_idx, 0);
+                        status = channel->send_local_block(
+                                block, idx == 
local_state._last_local_channel_idx);
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
+                    idx++;
                 }
             }
         } else {
@@ -471,21 +478,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     } else {
                         block_holder->get_block()->Clear();
                     }
+                    size_t idx = 0;
+                    bool moved = false;
                     for (auto* channel : local_state.channels) {
                         if (!channel->is_receiver_eof()) {
                             Status status;
                             if (channel->is_local()) {
-                                status = channel->send_local_block(&cur_block);
+                                // If this channel is the last, we can move 
this block to downstream pipeline.
+                                // Otherwise, this block also need to be 
broadcasted to other channels so should be copied.
+                                DCHECK_GE(local_state._last_local_channel_idx, 
0);
+                                status = channel->send_local_block(
+                                        &cur_block, idx == 
local_state._last_local_channel_idx);
+                                moved = idx == 
local_state._last_local_channel_idx;
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                 status = 
channel->send_broadcast_block(block_holder, eos);
                             }
                             HANDLE_CHANNEL_STATUS(state, channel, status);
                         }
+                        idx++;
+                    }
+                    if (moved) {
+                        local_state._serializer.reset_block();
+                    } else {
+                        cur_block.clear_column_data();
+                        
local_state._serializer.get_block()->set_mutable_columns(
+                                cur_block.mutate_columns());
                     }
-                    cur_block.clear_column_data();
-                    local_state._serializer.get_block()->set_mutable_columns(
-                            cur_block.mutate_columns());
                 }
             }
         }
@@ -496,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block);
+                auto status = current_channel->send_local_block(block, true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -582,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block);
+                auto status = current_channel->send_local_block(block, true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index a94392b906d..aeb6a1503b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -234,6 +234,7 @@ private:
     // for external table sink hash partition
     std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
     std::atomic<bool> _reach_limit = false;
+    int _last_local_channel_idx = -1;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 4ed414a0774..6fe0b7f9e25 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -210,7 +210,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
                     Status status;
                     for (auto channel : _channels) {
                         if (!channel->is_receiver_eof()) {
-                            status = 
channel->send_local_block(_output_block.get());
+                            status = 
channel->send_local_block(_output_block.get(), false);
                             HANDLE_CHANNEL_STATUS(state, channel, status);
                         }
                     }
@@ -234,7 +234,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
                         for (auto channel : _channels) {
                             if (!channel->is_receiver_eof()) {
                                 if (channel->is_local()) {
-                                    status = 
channel->send_local_block(&cur_block);
+                                    status = 
channel->send_local_block(&cur_block, false);
                                 } else {
                                     
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                     status = 
channel->send_broadcast_block(_block_holder, true);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 78c2761dcc7..c6f675f3c1b 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,6 +27,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     SCOPED_TIMER(_init_timer);
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+    if (_parent->cast<LocalExchangeSinkOperatorX>()._type == 
ExchangeType::HASH_SHUFFLE) {
+        _profile->add_info_string(
+                "UseGlobalShuffle",
+                
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
+    }
     _channel_id = info.task_idx;
     return Status::OK();
 }
@@ -61,10 +66,12 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
-                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
+                   "{}, _use_global_shuffle: {}, _channel_id: {}, 
_num_partitions: {}, "
+                   "_num_senders: {}, _num_sources: {}, "
                    "_running_sink_operators: {}, _running_source_operators: 
{}, _release_count: {}",
-                   Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
-                   _exchanger->_num_senders, _exchanger->_num_sources,
+                   Base::debug_string(indentation_level),
+                   
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
+                   _exchanger->_num_partitions, _exchanger->_num_senders, 
_exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
                    _release_count);
     return fmt::to_string(debug_string_buffer);
@@ -76,6 +83,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
     if (_type == ExchangeType::HASH_SHUFFLE) {
+        _use_global_shuffle = should_disable_bucket_shuffle;
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index f1d60fc03c4..9b72402abce 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -125,6 +125,7 @@ private:
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
     std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
+    bool _use_global_shuffle = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 44124ea7954..394005f6adf 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -231,7 +231,7 @@ Status Channel<Parent>::send_local_block(Status 
exec_status, bool eos) {
 }
 
 template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block) {
+Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
     SCOPED_TIMER(_parent->local_send_timer());
     if (_recvr_is_valid()) {
         if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
@@ -239,7 +239,7 @@ Status Channel<Parent>::send_local_block(Block* block) {
             COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
             COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
         }
-        _local_recvr->add_block(block, _parent->sender_id(), false);
+        _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
         return Status::OK();
     } else {
         return _receiver_status;
@@ -646,7 +646,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                 Status status;
                 for (auto channel : _channels) {
                     if (!channel->is_receiver_eof()) {
-                        status = channel->send_local_block(block);
+                        status = channel->send_local_block(block, false);
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
                 }
@@ -671,7 +671,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                     for (auto channel : _channels) {
                         if (!channel->is_receiver_eof()) {
                             if (channel->is_local()) {
-                                status = channel->send_local_block(&cur_block);
+                                status = channel->send_local_block(&cur_block, 
false);
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                 status = 
channel->send_broadcast_block(block_holder, eos);
@@ -698,7 +698,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                 for (auto channel : _channels) {
                     if (!channel->is_receiver_eof()) {
                         if (channel->is_local()) {
-                            status = channel->send_local_block(&cur_block);
+                            status = channel->send_local_block(&cur_block, 
false);
                         } else {
                             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                             status = channel->send_remote_block(_cur_pb_block, 
false);
@@ -717,7 +717,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block);
+                auto status = current_channel->send_local_block(block, false);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -829,7 +829,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status 
exec_status) {
                 for (auto channel : _channels) {
                     if (!channel->is_receiver_eof()) {
                         if (channel->is_local()) {
-                            status = channel->send_local_block(&block);
+                            status = channel->send_local_block(&block, false);
                         } else {
                             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                             status = channel->send_remote_block(_cur_pb_block, 
false);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index b9462434f07..92344b994e0 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -307,7 +307,7 @@ public:
 
     Status send_local_block(Status exec_status, bool eos = false);
 
-    Status send_local_block(Block* block);
+    Status send_local_block(Block* block, bool can_be_moved);
     // Flush buffered rows and close channel. This function don't wait the 
response
     // of close operation, client should call close_wait() to finish channel's 
close.
     // We split one close operation into two phases in order to make multiple 
channels


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to