This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6c338e60cd0 [cherry-pick](branch-30) fix exchange of tablet shuffle send block error (#44102) (#44166) 6c338e60cd0 is described below commit 6c338e60cd0e5fea31cac2e9efae71840a1942e2 Author: zhangstar333 <zhangs...@selectdb.com> AuthorDate: Mon Nov 18 22:12:37 2024 +0800 [cherry-pick](branch-30) fix exchange of tablet shuffle send block error (#44102) (#44166) cherry-pick from master (#44102) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++--------------- be/src/pipeline/exec/exchange_sink_operator.h | 3 +- be/src/vec/sink/vrow_distribution.h | 1 + 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 2d79b0d8b2b..85c87df8f4d 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -58,6 +58,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); _distribute_rows_into_channels_timer = ADD_TIMER(_profile, "DistributeRowsIntoChannelsTime"); + _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, @@ -276,23 +277,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status ExchangeSinkLocalState::_send_new_partition_batch() { - if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time - RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); - vectorized::Block tmp_block = - _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - auto& p = _parent->cast<ExchangeSinkOperatorX>(); - // these order is unique. - // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. - // 2. deal batched block - // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. - _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(p.sink(_state, &tmp_block, false)); - // Recovery back - _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); - _row_distribution._batching_block->clear_column_data(); - _row_distribution._deal_batched = false; - } +Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + auto& p = _parent->cast<ExchangeSinkOperatorX>(); + // Recovery back + _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + RETURN_IF_ERROR(p.sink(_state, input_block, false)); return Status::OK(); } @@ -512,7 +504,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block old_channel_mem_usage += channel->mem_usage(); } // check out of limit - RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); const auto& num_channels = local_state._partition_count; std::vector<std::vector<uint32>> channel2rows; @@ -527,21 +518,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( *block, convert_block, filtered_rows, has_filtered_rows, local_state._row_part_tablet_ids, local_state._number_input_rows)); - - const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); - channel2rows[tablet_id_hash % num_channels].emplace_back(row); + if (local_state._row_distribution.batching_rows() > 0) { + SCOPED_TIMER(local_state._send_new_partition_timer); + RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); + } else { + const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } } } - if (eos) { - local_state._row_distribution._deal_batched = true; - RETURN_IF_ERROR(local_state._send_new_partition_batch()); - } { SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); // the convert_block maybe different with block after execute exprs diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index f0cabb1ffde..bee34ad1a85 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,7 +96,7 @@ public: static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } - Status _send_new_partition_batch(); + Status _send_new_partition_batch(vectorized::Block* input_block); std::vector<std::shared_ptr<vectorized::Channel>> channels; int current_channel_idx {0}; // index of current channel to send to if _random == true bool only_local_exchange {false}; @@ -127,6 +127,7 @@ private: // Used to counter send bytes under local data exchange RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; + RuntimeProfile::Counter* _send_new_partition_timer = nullptr; RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 248982c0202..9e4cce6b528 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -131,6 +131,7 @@ public: std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val); bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org