HappenLee commented on code in PR #9314: URL: https://github.com/apache/incubator-doris/pull/9314#discussion_r866462928
########## be/src/vec/exec/vbroker_scan_node.cpp: ########## @@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, return Status::OK(); } - std::shared_ptr<vectorized::Block> scanner_block; - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_runtime_state->is_cancelled() && - _num_running_scanners > 0 && _block_queue.empty()) { - SCOPED_TIMER(_wait_scanner_timer); - _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); - } - if (!_process_status.ok()) { - // Some scanner process failed. - return _process_status; + const int batch_size = _runtime_state->batch_size(); + while (true) { + std::shared_ptr<vectorized::Block> scanner_block; + { + std::unique_lock<std::mutex> l(_batch_queue_lock); + while (_process_status.ok() && !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && _block_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::Cancelled("Cancelled"))) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_block_queue.empty()) { + scanner_block = _block_queue.front(); + _block_queue.pop_front(); + } } - if (_runtime_state->is_cancelled()) { - if (update_status(Status::Cancelled("Cancelled"))) { - _queue_writer_cond.notify_all(); + + // All scanner has been finished, and all cached batch has been read + if (scanner_block == nullptr || scanner_block.get() == nullptr) { + if (_mutable_block.get() != nullptr && !_mutable_block->empty()) { + *block = _mutable_block->to_block(); + reached_limit(block, eos); + LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit."; } - return _process_status; + _scan_finished.store(true); + *eos = true; + return Status::OK(); } - if (!_block_queue.empty()) { - scanner_block = _block_queue.front(); - _block_queue.pop_front(); + // notify one scanner + _queue_writer_cond.notify_one(); + + if (_mutable_block.get() == nullptr) { Review Comment: unlikely, should move the line to up ########## be/src/vec/exec/vbroker_scan_node.cpp: ########## @@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, return Status::OK(); } - std::shared_ptr<vectorized::Block> scanner_block; - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_runtime_state->is_cancelled() && - _num_running_scanners > 0 && _block_queue.empty()) { - SCOPED_TIMER(_wait_scanner_timer); - _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); - } - if (!_process_status.ok()) { - // Some scanner process failed. - return _process_status; + const int batch_size = _runtime_state->batch_size(); + while (true) { + std::shared_ptr<vectorized::Block> scanner_block; + { + std::unique_lock<std::mutex> l(_batch_queue_lock); + while (_process_status.ok() && !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && _block_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::Cancelled("Cancelled"))) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_block_queue.empty()) { + scanner_block = _block_queue.front(); + _block_queue.pop_front(); + } } - if (_runtime_state->is_cancelled()) { - if (update_status(Status::Cancelled("Cancelled"))) { - _queue_writer_cond.notify_all(); + + // All scanner has been finished, and all cached batch has been read + if (scanner_block == nullptr || scanner_block.get() == nullptr) { Review Comment: `scanner_block == nullptr || scanner_block.get() == nullptr` only need one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org