Gabriel39 commented on code in PR #34557: URL: https://github.com/apache/doris/pull/34557#discussion_r1594888265
########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -46,36 +46,31 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block auto get_data = [&](vectorized::Block* result_block) { do { - const auto* offset_start = &(( - *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); + const auto* offset_start = partitioned_block.second.row_idxs->data() + + partitioned_block.second.offset_start; auto block_wrapper = partitioned_block.first; local_state._shared_state->sub_mem_usage( local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false); mutable_block->add_rows(&block_wrapper->data_block, offset_start, - offset_start + std::get<2>(partitioned_block.second)); + offset_start + partitioned_block.second.length); block_wrapper->unref(local_state._shared_state); } while (mutable_block->rows() < state->batch_size() && _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); *result_block = mutable_block->to_block(); }; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = vectorized::MutableBlock::create_unique( - partitioned_block.first->data_block.clone_empty()); - get_data(block); - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + + if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { Review Comment: Assume the following events occur in order: 1. Source operator `_data_queue[local_state._channel_id].try_dequeue(partitioned_block)` return false. 2. The last sink operator push the last block into `_data_queue`. 3. The last sink operator sub `_running_sink_operators` to zero. 4. Source operator thinks `_running_sink_operators` is true. And then, eos is true because of event 4. So we ignore this last block and its results will be incorrect. ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -186,23 +180,17 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { Review Comment: ditto ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -361,23 +343,17 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org