yiguolei commented on code in PR #45375: URL: https://github.com/apache/doris/pull/45375#discussion_r1946341203
########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -27,129 +28,193 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -template <typename BlockType> -void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, - LocalExchangeSinkLocalState* local_state, - BlockType&& block) { + +template <typename QueueType> +void Exchanger<QueueType>::_enqueue_data_and_set_ready( + RuntimeProfile::Counter* enqueue_rows_counter, int channel_id, + LocalExchangeSinkLocalState* local_state, typename QueueType::BlockType&& block) { if (local_state == nullptr) { - _enqueue_data_and_set_ready(channel_id, std::move(block)); + _enqueue_data_and_set_ready(enqueue_rows_counter, channel_id, std::move(block)); return; } size_t allocated_bytes = 0; + int64_t rows = 0; // PartitionedBlock is used by shuffle exchanger. // PartitionedBlock will be push into multiple queues with different row ranges, so it will be // referenced multiple times. Otherwise, we only ref the block once because it is only push into // one queue. - if constexpr (std::is_same_v<PartitionedBlock, BlockType> || - std::is_same_v<BroadcastBlock, BlockType>) { + if constexpr (std::is_same_v<PartitionedBlock, typename QueueType::BlockType> || + std::is_same_v<BroadcastBlock, typename QueueType::BlockType>) { allocated_bytes = block.first->data_block.allocated_bytes(); + rows = block.second.length; } else { block->ref(1); allocated_bytes = block->data_block.allocated_bytes(); + rows = block->data_block.rows(); } std::unique_lock l(*_m[channel_id]); - local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes, - !std::is_same_v<PartitionedBlock, BlockType> && - !std::is_same_v<BroadcastBlock, BlockType>); - if (_data_queue[channel_id].enqueue(std::move(block))) { + local_state->_shared_state->add_mem_usage( + channel_id, allocated_bytes, + !std::is_same_v<PartitionedBlock, typename QueueType::BlockType> && + !std::is_same_v<BroadcastBlock, typename QueueType::BlockType>); + if (_data_queue[channel_id]->enqueue(std::move(block))) { + COUNTER_UPDATE(enqueue_rows_counter, rows); local_state->_shared_state->set_ready_to_read(channel_id); } else { local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes); // `enqueue(block)` return false iff this queue's source operator is already closed so we // just unref the block. - if constexpr (std::is_same_v<PartitionedBlock, BlockType> || - std::is_same_v<BroadcastBlock, BlockType>) { - block.first->unref(local_state->_shared_state, allocated_bytes, channel_id); + if constexpr (std::is_same_v<PartitionedBlock, typename QueueType::BlockType> || + std::is_same_v<BroadcastBlock, typename QueueType::BlockType>) { + block.first->unref(this, local_state->_shared_state, allocated_bytes, channel_id); } else { - block->unref(local_state->_shared_state, allocated_bytes, channel_id); + block->unref(this, local_state->_shared_state, allocated_bytes, channel_id); DCHECK_EQ(block->ref_value(), 0); } } } -template <typename BlockType> -bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_state, - BlockType& block, bool* eos, vectorized::Block* data_block, - int channel_id) { +template <typename QueueType> +bool Exchanger<QueueType>::_dequeue_data(RuntimeProfile::Counter* dequeue_rows_counter, + RuntimeProfile::Counter* get_block_failed_counter, + LocalExchangeSourceLocalState* local_state, + typename QueueType::BlockType& block, bool* eos, + vectorized::Block* data_block, int channel_id) { if (local_state == nullptr) { - if (!_dequeue_data(block, eos, data_block, channel_id)) { - throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: {}", - data_queue_debug_string(channel_id)); + return _dequeue_data(dequeue_rows_counter, get_block_failed_counter, block, eos, data_block, + channel_id); + } + int64_t rows = 0; + Defer defer {[&]() { + if (rows > 0) { + if (dequeue_rows_counter) { + COUNTER_UPDATE(dequeue_rows_counter, rows); + } + } else { + if (get_block_failed_counter) { + COUNTER_UPDATE(get_block_failed_counter, 1); + } } - return true; - } + }}; bool all_finished = _running_sink_operators == 0; - if (_data_queue[channel_id].try_dequeue(block)) { - if constexpr (std::is_same_v<PartitionedBlock, BlockType> || - std::is_same_v<BroadcastBlock, BlockType>) { + if (_data_queue[channel_id]->try_dequeue(block)) { + if constexpr (std::is_same_v<PartitionedBlock, typename QueueType::BlockType> || + std::is_same_v<BroadcastBlock, typename QueueType::BlockType>) { + rows = block.second.length; + } else { + rows = block->data_block.rows(); + } + + if constexpr (std::is_same_v<PartitionedBlock, typename QueueType::BlockType> || + std::is_same_v<BroadcastBlock, typename QueueType::BlockType>) { local_state->_shared_state->sub_mem_usage(channel_id, block.first->data_block.allocated_bytes()); } else { local_state->_shared_state->sub_mem_usage(channel_id, block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); + block->unref(this, local_state->_shared_state, data_block->allocated_bytes(), + channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; } else if (all_finished) { *eos = true; } else { std::unique_lock l(*_m[channel_id]); - if (_data_queue[channel_id].try_dequeue(block)) { - if constexpr (std::is_same_v<PartitionedBlock, BlockType> || - std::is_same_v<BroadcastBlock, BlockType>) { + if (_data_queue[channel_id]->try_dequeue(block)) { Review Comment: 又或者说,未来优化器要规划local shuffle 这个算子,那么这个local shuffle的算子,跟我们现在的exchange 算子是不是应该是一个? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org