yiguolei commented on code in PR #45207:
URL: https://github.com/apache/doris/pull/45207#discussion_r1886082723


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -361,47 +354,44 @@ Status PipelineTask::execute(bool* eos) {
             RETURN_IF_ERROR(_sink->revoke_memory(_state));
             continue;
         }
-        *eos = _eos;
         DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
             Status status =
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
-        // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
-        if (_dry_run || _sink->is_finished(_state)) {
-            *eos = true;
-            _eos = true;
-        } else {
+        if (_sink->is_finished(_state)) {
+            set_wake_up_and_dep_ready();
+        }
+
+        // `_dry_run` means sink operator need no more data
+        *eos = wake_up_early() || _dry_run;
+        if (!*eos) {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
         }
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            Status status = Status::OK();
-            // Define a lambda function to catch sink exception, because sink 
will check
-            // return error status with EOF, it is special, could not return 
directly.
-            auto sink_function = [&]() -> Status {
-                Status internal_st;
-                internal_st = _sink->sink(_state, block, *eos);
-                return internal_st;
-            };
-            status = sink_function();
-            if (!status.is<ErrorCode::END_OF_FILE>()) {
-                RETURN_IF_ERROR(status);
+            Status status = _sink->sink(_state, block, *eos);
+
+            if (status.is<ErrorCode::END_OF_FILE>()) {
+                set_wake_up_and_dep_ready();
+            } else if (!status) {
+                return status;
             }
-            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
+
             if (*eos) { // just return, the scheduler will do finish work
-                _eos = true;
+                RETURN_IF_ERROR(close(status, false));

Review Comment:
   这个代码,应该在377 行之前,不能在sink 之后



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to