yiguolei commented on code in PR #47462: URL: https://github.com/apache/doris/pull/47462#discussion_r1966504919
########## be/src/pipeline/pipeline_task.cpp: ########## @@ -340,36 +370,127 @@ Status PipelineTask::execute(bool* eos) { _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); - auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); - if (should_revoke_memory(_state, sink_revocable_mem_size)) { - RETURN_IF_ERROR(_sink->revoke_memory(_state)); - continue; - } DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); return status; }); + // `_sink->is_finished(_state)` means sink operator should be finished if (_sink->is_finished(_state)) { set_wake_up_and_dep_ready(); } // `_dry_run` means sink operator need no more data *eos = wake_up_early() || _dry_run; - if (!*eos) { + auto workload_group = _state->get_query_ctx()->workload_group(); + if (*eos) { + _pending_block.reset(); + } else if (_pending_block) [[unlikely]] { + LOG(INFO) << "Query: " << print_id(query_id) + << " has pending block, size: " << _pending_block->allocated_bytes(); + _block = std::move(_pending_block); + block = _block.get(); + *eos = _pending_eos; + } else { SCOPED_TIMER(_get_block_timer); + if (_state->low_memory_mode()) { + _sink->set_low_memory_mode(_state); + _root->set_low_memory_mode(_state); + } + DEFER_RELEASE_RESERVED(); _get_block_counter->update(1); - RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); - } + const auto reserve_size = _root->get_reserve_mem_size(_state); Review Comment: root 是pipeline 中负责流式计算的部分,他有可能需要比较大的内存,比如一个宽列的时候,join 产生一次就需要比较大的内存。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org