This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 610054c77b9 [cherry-pick](branch-21) fix exchange of tablet shuffle send block error (#44102) (#44230) 610054c77b9 is described below commit 610054c77b9d4a74fcb522d4451ab52139cce6e0 Author: zhangstar333 <zhangs...@selectdb.com> AuthorDate: Tue Nov 19 17:31:06 2024 +0800 [cherry-pick](branch-21) fix exchange of tablet shuffle send block error (#44102) (#44230) cherry-pick from master (#44102) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++++--------------- be/src/pipeline/exec/exchange_sink_operator.h | 3 +- be/src/vec/sink/vrow_distribution.h | 1 + 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index b26c69ad560..ff8bcdd9236 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); _split_block_distribute_by_channel_timer = ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); + _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( @@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status ExchangeSinkLocalState::_send_new_partition_batch() { - if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time - RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); - vectorized::Block tmp_block = - _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - auto& p = _parent->cast<ExchangeSinkOperatorX>(); - // these order is unique. - // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. - // 2. deal batched block - // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. - _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(p.sink(_state, &tmp_block, false)); - // Recovery back - _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); - _row_distribution._batching_block->clear_column_data(); - _row_distribution._deal_batched = false; - } +Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + auto& p = _parent->cast<ExchangeSinkOperatorX>(); + // Recovery back + _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + RETURN_IF_ERROR(p.sink(_state, input_block, false)); return Status::OK(); } @@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { // check out of limit - RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); const auto& num_channels = local_state._partition_count; std::vector<std::vector<uint32>> channel2rows; @@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( *block, convert_block, filtered_rows, has_filtered_rows, local_state._row_part_tablet_ids, local_state._number_input_rows)); - - const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); - channel2rows[tablet_id_hash % num_channels].emplace_back(row); + if (local_state._row_distribution.batching_rows() > 0) { + SCOPED_TIMER(local_state._send_new_partition_timer); + RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); + } else { + const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } } } - if (eos) { - local_state._row_distribution._deal_batched = true; - RETURN_IF_ERROR(local_state._send_new_partition_batch()); - } // the convert_block maybe different with block after execute exprs // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8d9382dadd0..c055b131d8a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -144,7 +144,7 @@ public: static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } - Status _send_new_partition_batch(); + Status _send_new_partition_batch(vectorized::Block* input_block); std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels; std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>> channel_shared_ptrs; @@ -179,6 +179,7 @@ private: // Used to counter send bytes under local data exchange RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; + RuntimeProfile::Counter* _send_new_partition_timer = nullptr; RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 248982c0202..9e4cce6b528 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -131,6 +131,7 @@ public: std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val); bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org