zhangstar333 commented on code in PR #46262: URL: https://github.com/apache/doris/pull/46262#discussion_r1900696080
########## be/src/pipeline/exec/partition_sort_source_operator.cpp: ########## @@ -41,44 +41,45 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); output_block->clear_column_data(); + + auto get_data_from_blocks_buffer = false; { std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); - if (local_state._shared_state->blocks_buffer.empty() == false) { + get_data_from_blocks_buffer = !local_state._shared_state->blocks_buffer.empty(); + if (get_data_from_blocks_buffer) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); - //if buffer have no data and sink not eos, block reading and wait for signal again - RETURN_IF_ERROR(vectorized::VExprContext::filter_block( - local_state._conjuncts, output_block, output_block->columns())); + if (local_state._shared_state->blocks_buffer.empty() && - local_state._shared_state->sink_eos == false) { + !local_state._shared_state->sink_eos) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock); - if (local_state._shared_state->sink_eos == false) { + if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); } } - if (!output_block->empty()) { - local_state._num_rows_returned += output_block->rows(); - } - return Status::OK(); } } - // is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read() - // notice: must output block from _blocks_buffer firstly, and then get_sorted_block. - // as when the child is eos, then set _can_read = true, and _partition_sorts have push_back sorter. - // if we move the _blocks_buffer output at last(behind 286 line), - // it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0) - RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, - output_block->columns())); - { - std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); + if (!get_data_from_blocks_buffer) { + // is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read() + // notice: must output block from _blocks_buffer firstly, and then get_sorted_block. + // as when the child is eos, then set _can_read = true, and _partition_sorts have push_back sorter. + // if we move the _blocks_buffer output at last(behind 286 line), + // it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0) + RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); + { + std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); - *eos = local_state._shared_state->blocks_buffer.empty() && - local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); + *eos = local_state._shared_state->blocks_buffer.empty() && + local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); + } } + + //if buffer have no data and sink not eos, block reading and wait for signal again + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, Review Comment: could move into 84L -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org