This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6c338e60cd0 [cherry-pick](branch-30) fix exchange of tablet shuffle 
send block error (#44102) (#44166)
6c338e60cd0 is described below

commit 6c338e60cd0e5fea31cac2e9efae71840a1942e2
Author: zhangstar333 <zhangs...@selectdb.com>
AuthorDate: Mon Nov 18 22:12:37 2024 +0800

    [cherry-pick](branch-30) fix exchange of tablet shuffle send block error 
(#44102) (#44166)
    
    cherry-pick from master  (#44102)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++---------------
 be/src/pipeline/exec/exchange_sink_operator.h   |  3 +-
 be/src/vec/sink/vrow_distribution.h             |  1 +
 3 files changed, 24 insertions(+), 31 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 2d79b0d8b2b..85c87df8f4d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -58,6 +58,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
     _split_block_hash_compute_timer = ADD_TIMER(_profile, 
"SplitBlockHashComputeTime");
     _distribute_rows_into_channels_timer = ADD_TIMER(_profile, 
"DistributeRowsIntoChannelsTime");
+    _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _overall_throughput = _profile->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
@@ -276,23 +277,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status ExchangeSinkLocalState::_send_new_partition_batch() {
-    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 
1 time
-        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
-        vectorized::Block tmp_block =
-                _row_distribution._batching_block->to_block(); // Borrow out, 
for lval ref
-        auto& p = _parent->cast<ExchangeSinkOperatorX>();
-        // these order is unique.
-        //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
-        //  2. deal batched block
-        //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
-        _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
-        // Recovery back
-        
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
-        _row_distribution._batching_block->clear_column_data();
-        _row_distribution._deal_batched = false;
-    }
+Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* 
input_block) {
+    RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+    auto& p = _parent->cast<ExchangeSinkOperatorX>();
+    // Recovery back
+    _row_distribution.clear_batching_stats();
+    _row_distribution._batching_block->clear_column_data();
+    _row_distribution._deal_batched = false;
+    RETURN_IF_ERROR(p.sink(_state, input_block, false));
     return Status::OK();
 }
 
@@ -512,7 +504,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             old_channel_mem_usage += channel->mem_usage();
         }
         // check out of limit
-        RETURN_IF_ERROR(local_state._send_new_partition_batch());
         std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
         const auto& num_channels = local_state._partition_count;
         std::vector<std::vector<uint32>> channel2rows;
@@ -527,21 +518,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
                     *block, convert_block, filtered_rows, has_filtered_rows,
                     local_state._row_part_tablet_ids, 
local_state._number_input_rows));
-
-            const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
-            const auto& tablet_ids = 
local_state._row_part_tablet_ids[0].tablet_ids;
-            for (int idx = 0; idx < row_ids.size(); ++idx) {
-                const auto& row = row_ids[idx];
-                const auto& tablet_id_hash =
-                        HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(int64), 0);
-                channel2rows[tablet_id_hash % num_channels].emplace_back(row);
+            if (local_state._row_distribution.batching_rows() > 0) {
+                SCOPED_TIMER(local_state._send_new_partition_timer);
+                RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
+            } else {
+                const auto& row_ids = 
local_state._row_part_tablet_ids[0].row_ids;
+                const auto& tablet_ids = 
local_state._row_part_tablet_ids[0].tablet_ids;
+                for (int idx = 0; idx < row_ids.size(); ++idx) {
+                    const auto& row = row_ids[idx];
+                    const auto& tablet_id_hash =
+                            HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(int64), 0);
+                    channel2rows[tablet_id_hash % 
num_channels].emplace_back(row);
+                }
             }
         }
 
-        if (eos) {
-            local_state._row_distribution._deal_batched = true;
-            RETURN_IF_ERROR(local_state._send_new_partition_batch());
-        }
         {
             SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
             // the convert_block maybe different with block after execute exprs
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index f0cabb1ffde..bee34ad1a85 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -96,7 +96,7 @@ public:
     static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
         return Status::OK();
     }
-    Status _send_new_partition_batch();
+    Status _send_new_partition_batch(vectorized::Block* input_block);
     std::vector<std::shared_ptr<vectorized::Channel>> channels;
     int current_channel_idx {0}; // index of current channel to send to if 
_random == true
     bool only_local_exchange {false};
@@ -127,6 +127,7 @@ private:
     // Used to counter send bytes under local data exchange
     RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
+    RuntimeProfile::Counter* _send_new_partition_timer = nullptr;
 
     RuntimeProfile::Counter* _wait_queue_timer = nullptr;
     RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 248982c0202..9e4cce6b528 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -131,6 +131,7 @@ public:
                                       std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
                                       int64_t& rows_stat_val);
     bool need_deal_batching() const { return _deal_batched && _batching_rows > 
0; }
+    size_t batching_rows() const { return _batching_rows; }
     // create partitions when need for auto-partition table using 
#_partitions_need_create.
     Status automatic_create_partition();
     void clear_batching_stats();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to