Copilot commented on code in PR #60313:
URL: https://github.com/apache/doris/pull/60313#discussion_r2735959692


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1446,31 +1446,24 @@ Status FragmentMgr::transmit_rec_cte_block(
 
 Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
-        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
-        if (!fragment_ctx) {
-            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
-                                    print_id(query_id), fragment);
-        }
+    auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+    if (!fragment_ctx) {
+        return Status::NotFound("Fragment context (query-id: {}, fragment-id: 
{}) not found",
+                                print_id(query_id), fragment);
+    }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
-        }
+    if (stage == PRerunFragmentParams::wait) {
+        return fragment_ctx->wait_close(false);
+    } else if (stage == PRerunFragmentParams::release) {
+        return fragment_ctx->set_to_rerun();
+    } else if (stage == PRerunFragmentParams::rebuild) {
+        return fragment_ctx->rebuild(_thread_pool.get());
+    } else if (stage == PRerunFragmentParams::submit) {

Review Comment:
   `rerun_fragment()` no longer attaches a task context, and the `submit` stage 
now calls `fragment_ctx->submit()` without any `SCOPED_ATTACH_TASK`. Other 
stages (`wait_close`/`set_to_rerun`/`rebuild`) now attach inside the callee, 
but `submit()` does not, making thread context / signal task id / mem tracking 
inconsistent for the submit path. Consider attaching in this caller for the 
`submit` branch via `SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx())` (or 
equivalent) before calling `submit()`.
   ```suggestion
       } else if (stage == PRerunFragmentParams::submit) {
           SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
   ```



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1446,31 +1446,24 @@ Status FragmentMgr::transmit_rec_cte_block(
 
 Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
-        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
-        if (!fragment_ctx) {
-            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
-                                    print_id(query_id), fragment);
-        }
+    auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+    if (!fragment_ctx) {
+        return Status::NotFound("Fragment context (query-id: {}, fragment-id: 
{}) not found",
+                                print_id(query_id), fragment);
+    }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
-        }
+    if (stage == PRerunFragmentParams::wait) {
+        return fragment_ctx->wait_close(false);
+    } else if (stage == PRerunFragmentParams::release) {
+        return fragment_ctx->set_to_rerun();
+    } else if (stage == PRerunFragmentParams::rebuild) {
+        return fragment_ctx->rebuild(_thread_pool.get());
+    } else if (stage == PRerunFragmentParams::submit) {
+        return fragment_ctx->submit();
+    } else if (stage == PRerunFragmentParams::close) {
+        return fragment_ctx->wait_close(true);
     } else {
-        return Status::NotFound(
-                "reset_fragment: Query context (query-id: {}) not found, maybe 
finished",
-                print_id(query_id));
+        return Status::InvalidArgument("Unknown rerun fragment opcode: {}", 
stage);
     }
     return Status::OK();

Review Comment:
   This `return Status::OK();` is unreachable because every branch above 
returns. Removing it would avoid dead code and make control flow clearer.



##########
be/src/pipeline/exec/rec_cte_source_operator.h:
##########
@@ -193,15 +194,15 @@ class RecCTESourceOperatorX : public 
OperatorX<RecCTESourceLocalState> {
             stub->rerun_fragment(&controller, &request, &result, 
brpc::DoNothing());
             brpc::Join(controller.call_id());
             if (controller.Failed()) {
-                return Status::InternalError(controller.ErrorText());
+                st = Status::InternalError(controller.ErrorText());
             }
 
             auto rpc_st = Status::create(result.status());
             if (!rpc_st.ok()) {

Review Comment:
   `_send_rerun_fragments()` now continues on RPC failures but overwrites `st` 
on each error, so the final returned status depends on the last failing 
fragment and may lose the original/root-cause error (and which fragment 
failed). Also, when `controller.Failed()` is true, `result.status()` is not 
meaningful; consider skipping the `Status::create(result.status())` path for 
that iteration (e.g., `continue`) and preserving the first failure (or 
aggregating errors with fragment id/address) instead of overwriting.
   ```suggestion
                   if (st.ok()) {
                       st = Status::InternalError(controller.ErrorText());
                   }
                   // When the RPC controller reports failure, the result 
status is not reliable.
                   continue;
               }
   
               auto rpc_st = Status::create(result.status());
               if (!rpc_st.ok() && st.ok()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to