Copilot commented on code in PR #61739:
URL: https://github.com/apache/doris/pull/61739#discussion_r2992162719


##########
be/src/exec/operator/result_sink_operator.cpp:
##########
@@ -199,9 +199,13 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         }
         RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows));
     }
+    // In parallel result sink mode, the buffer is registered under query_id; 
otherwise
+    // it is registered under fragment_instance_id.  Pass the matching key so 
the
+    // deferred cancel actually finds and removes the buffer entry.
     state->exec_env()->result_mgr()->cancel_at_time(
             time(nullptr) + config::result_buffer_cancelled_interval_time,
-            state->fragment_instance_id());
+            state->query_options().enable_parallel_result_sink ? 
state->query_id()
+                                                               : 
state->fragment_instance_id());

Review Comment:
   In parallel result sink mode, multiple fragment instances share the same 
ResultBlockBuffer keyed by query_id, and `close()` is invoked per fragment 
instance (removing one dependency each time). Scheduling `cancel_at_time(..., 
query_id)` on every instance close can cancel/erase the shared buffer after the 
interval even if other instances are still producing results (i.e., before the 
buffer is actually fully closed), which can break result fetching and/or cancel 
in-flight execution. Consider scheduling the deferred cancel only when the 
shared buffer transitions to fully closed (after the last dependency is 
removed), e.g., by moving this scheduling into `ResultBlockBuffer::close` when 
`_result_sink_dependencies` becomes empty, or by extending the 
`ResultBlockBufferBase::close` contract to indicate “final close” so callers 
can safely schedule cancel once.



##########
be/src/exec/operator/result_file_sink_operator.cpp:
##########
@@ -135,9 +135,13 @@ Status ResultFileSinkLocalState::close(RuntimeState* 
state, Status exec_status)
         
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
         RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows));
     }
+    // In parallel outfile mode, the buffer is registered under query_id; 
otherwise
+    // it is registered under fragment_instance_id.  Pass the matching key so 
the
+    // deferred cancel actually finds and removes the buffer entry.
     state->exec_env()->result_mgr()->cancel_at_time(
             time(nullptr) + config::result_buffer_cancelled_interval_time,
-            state->fragment_instance_id());
+            state->query_options().enable_parallel_outfile ? state->query_id()
+                                                           : 
state->fragment_instance_id());

Review Comment:
   Same concern as result sink: in parallel outfile mode the buffer is shared 
under `query_id` and `ResultBlockBuffer::close` only fully closes once all 
fragment-instance dependencies have been removed. Calling `cancel_at_time(..., 
query_id)` from every fragment instance close can cancel the shared buffer 
after the interval even if some instances are still running, potentially 
breaking client fetch and/or canceling remaining producers. Consider deferring 
scheduling until the buffer is fully closed (e.g., schedule inside 
`ResultBlockBuffer::close` when the last dependency is removed, or change 
`close()` to return/indicate when it performed the final close so this can be 
scheduled once).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to