This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 610054c77b9 [cherry-pick](branch-21) fix exchange of tablet shuffle 
send block error (#44102) (#44230)
610054c77b9 is described below

commit 610054c77b9d4a74fcb522d4451ab52139cce6e0
Author: zhangstar333 <zhangs...@selectdb.com>
AuthorDate: Tue Nov 19 17:31:06 2024 +0800

    [cherry-pick](branch-21) fix exchange of tablet shuffle send block error 
(#44102) (#44230)
    
    cherry-pick from master (#44102)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++---------------
 be/src/pipeline/exec/exchange_sink_operator.h   |  3 +-
 be/src/vec/sink/vrow_distribution.h             |  1 +
 3 files changed, 24 insertions(+), 31 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b26c69ad560..ff8bcdd9236 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _split_block_hash_compute_timer = ADD_TIMER(_profile, 
"SplitBlockHashComputeTime");
     _split_block_distribute_by_channel_timer =
             ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
+    _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     _overall_throughput = _profile->add_derived_counter(
@@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status ExchangeSinkLocalState::_send_new_partition_batch() {
-    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 
1 time
-        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
-        vectorized::Block tmp_block =
-                _row_distribution._batching_block->to_block(); // Borrow out, 
for lval ref
-        auto& p = _parent->cast<ExchangeSinkOperatorX>();
-        // these order is unique.
-        //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
-        //  2. deal batched block
-        //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
-        _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
-        // Recovery back
-        
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
-        _row_distribution._batching_block->clear_column_data();
-        _row_distribution._deal_batched = false;
-    }
+Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* 
input_block) {
+    RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+    auto& p = _parent->cast<ExchangeSinkOperatorX>();
+    // Recovery back
+    _row_distribution.clear_batching_stats();
+    _row_distribution._batching_block->clear_column_data();
+    _row_distribution._deal_batched = false;
+    RETURN_IF_ERROR(p.sink(_state, input_block, false));
     return Status::OK();
 }
 
@@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
     } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         // check out of limit
-        RETURN_IF_ERROR(local_state._send_new_partition_batch());
         std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
         const auto& num_channels = local_state._partition_count;
         std::vector<std::vector<uint32>> channel2rows;
@@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
                     *block, convert_block, filtered_rows, has_filtered_rows,
                     local_state._row_part_tablet_ids, 
local_state._number_input_rows));
-
-            const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
-            const auto& tablet_ids = 
local_state._row_part_tablet_ids[0].tablet_ids;
-            for (int idx = 0; idx < row_ids.size(); ++idx) {
-                const auto& row = row_ids[idx];
-                const auto& tablet_id_hash =
-                        HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(int64), 0);
-                channel2rows[tablet_id_hash % num_channels].emplace_back(row);
+            if (local_state._row_distribution.batching_rows() > 0) {
+                SCOPED_TIMER(local_state._send_new_partition_timer);
+                RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
+            } else {
+                const auto& row_ids = 
local_state._row_part_tablet_ids[0].row_ids;
+                const auto& tablet_ids = 
local_state._row_part_tablet_ids[0].tablet_ids;
+                for (int idx = 0; idx < row_ids.size(); ++idx) {
+                    const auto& row = row_ids[idx];
+                    const auto& tablet_id_hash =
+                            HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(int64), 0);
+                    channel2rows[tablet_id_hash % 
num_channels].emplace_back(row);
+                }
             }
         }
 
-        if (eos) {
-            local_state._row_distribution._deal_batched = true;
-            RETURN_IF_ERROR(local_state._send_new_partition_batch());
-        }
         // the convert_block maybe different with block after execute exprs
         // when send data we still use block
         RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, 
num_channels,
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8d9382dadd0..c055b131d8a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -144,7 +144,7 @@ public:
     static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
         return Status::OK();
     }
-    Status _send_new_partition_batch();
+    Status _send_new_partition_batch(vectorized::Block* input_block);
     std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
     
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
             channel_shared_ptrs;
@@ -179,6 +179,7 @@ private:
     // Used to counter send bytes under local data exchange
     RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
+    RuntimeProfile::Counter* _send_new_partition_timer = nullptr;
 
     RuntimeProfile::Counter* _wait_queue_timer = nullptr;
     RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 248982c0202..9e4cce6b528 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -131,6 +131,7 @@ public:
                                       std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
                                       int64_t& rows_stat_val);
     bool need_deal_batching() const { return _deal_batched && _batching_rows > 
0; }
+    size_t batching_rows() const { return _batching_rows; }
     // create partitions when need for auto-partition table using 
#_partitions_need_create.
     Status automatic_create_partition();
     void clear_batching_stats();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to