Copilot commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3427868923


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -168,13 +168,36 @@ private void sendWriteRequest() throws JobException {
             log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} 
jobId={} backend={}:{} timeout_sec={}",
                     taskId, getJobId(), backend.getHost(), 
backend.getBrpcPort(),
                     Config.streaming_cdc_heavy_rpc_timeout_sec);
+            // the request may have been dispatched and still running remotely
+            noRetry = true;
             throw new JobException("cdc_client RPC timeout: /api/writeRecords 
taskId=" + taskId);
         } catch (ExecutionException | InterruptedException ex) {
+            if (ex instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
             log.error("Send write request failed: ", ex);
+            noRetry = true;
             throw new JobException(ex);
         }
     }
 
+    private Backend resolveBackend() throws JobException {
+        // Snapshot phase keeps per-round selection; binlog phase binds to a 
fixed BE for reuse.
+        if (((JdbcOffset) runningOffset).snapshotSplit()) {
+            return StreamingJobUtils.selectBackend(cloudCluster);
+        }
+        return getStreamingJob().resolveBoundBackend();
+    }
+
+    // Fail loud on a dropped/wrong-type job rather than return null and risk 
a downstream NPE.
+    private StreamingInsertJob getStreamingJob() throws JobException {
+        Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+        if (job == null) {
+            throw new JobException("Streaming job " + getJobId() + " not 
found");
+        }
+        return (StreamingInsertJob) job;

Review Comment:
   `getStreamingJob()` unconditionally casts `Job` to `StreamingInsertJob`. If 
the job has been dropped/replaced or the ID is reused for another job type, 
this throws a `ClassCastException` (not a `JobException`) and will surface as 
an unhelpful task failure. Add an `instanceof` guard and throw a clear 
`JobException` instead.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -118,7 +119,7 @@ public Object fetchEndOffset(@RequestBody JobBaseConfig 
jobConfig) {
     @RequestMapping(path = "/api/compareOffset", method = RequestMethod.POST)
     public Object compareOffset(@RequestBody CompareOffsetRequest 
compareOffsetRequest) {
         try {
-            SourceReader reader = 
Env.getCurrentEnv().getReader(compareOffsetRequest);
+            SourceReader reader = 
Env.getCurrentEnv().getMetaReader(compareOffsetRequest);
             return 
RestResponse.success(reader.compareOffset(compareOffsetRequest));

Review Comment:
   `/api/compareOffset` is also used as a job heartbeat, but unlike 
`/api/fetchEndOffset` it doesn’t call `Env.keepAlive(jobId)`. This can let the 
idle-reader reaper reclaim an otherwise-active incremental reader and force an 
unnecessary rebuild/re-location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to