yiguolei commented on code in PR #47462:
URL: https://github.com/apache/doris/pull/47462#discussion_r1966504919


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -340,36 +370,127 @@ Status PipelineTask::execute(bool* eos) {
         _block->clear_column_data(_root->row_desc().num_materialized_slots());
         auto* block = _block.get();
 
-        auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
-        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
-            RETURN_IF_ERROR(_sink->revoke_memory(_state));
-            continue;
-        }
         DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
             Status status =
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
+
         // `_sink->is_finished(_state)` means sink operator should be finished
         if (_sink->is_finished(_state)) {
             set_wake_up_and_dep_ready();
         }
 
         // `_dry_run` means sink operator need no more data
         *eos = wake_up_early() || _dry_run;
-        if (!*eos) {
+        auto workload_group = _state->get_query_ctx()->workload_group();
+        if (*eos) {
+            _pending_block.reset();
+        } else if (_pending_block) [[unlikely]] {
+            LOG(INFO) << "Query: " << print_id(query_id)
+                      << " has pending block, size: " << 
_pending_block->allocated_bytes();
+            _block = std::move(_pending_block);
+            block = _block.get();
+            *eos = _pending_eos;
+        } else {
             SCOPED_TIMER(_get_block_timer);
+            if (_state->low_memory_mode()) {
+                _sink->set_low_memory_mode(_state);
+                _root->set_low_memory_mode(_state);
+            }
+            DEFER_RELEASE_RESERVED();
             _get_block_counter->update(1);
-            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
-        }
+            const auto reserve_size = _root->get_reserve_mem_size(_state);

Review Comment:
   root 是pipeline 中负责流式计算的部分,他有可能需要比较大的内存,比如一个宽列的时候,join 产生一次就需要比较大的内存。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to