This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da2dd95ef8 [fix](fe) Fix INSERT INTO local TVF ignoring backend_id 
during scheduling (#61732)
3da2dd95ef8 is described below

commit 3da2dd95ef87429eaa9449c4877599c68a9648f0
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Mar 25 18:56:30 2026 -0700

    [fix](fe) Fix INSERT INTO local TVF ignoring backend_id during scheduling 
(#61732)
    
    ### What problem does this PR solve?
    
    Followup #60719
    
    Problem Summary: When using `INSERT INTO local("backend_id" = "X" ...)`,
    the data should only be written to the BE node specified by
    `backend_id`. However, the Coordinator schedules the sink fragment to an
    arbitrary backend because the fragment uses `UNPARTITIONED` partition,
    which causes `SimpleScheduler.getHost()` to pick any available BE. This
    results in file creation failures when the target directory only exists
    on the intended BE.
    
    **Root Cause:**
    - The read path (`SELECT FROM local(...)`) correctly handles this via
    `TVFScanNode.initBackendPolicy()`, restricting the scan to the specified
    backend.
    - The write path (`INSERT INTO local(...)`) had no equivalent logic.
    `PhysicalPlanTranslator.visitPhysicalTVFTableSink()` creates the
    fragment as `UNPARTITIONED`, and `Coordinator.computeFragmentHosts()`
    assigns it to a random BE.
    
    **Fix:**
    Added backend_id-aware scheduling in
    `Coordinator.computeFragmentHosts()` for local `TVFTableSink`, forcing
    the sink fragment to execute on the designated backend. This is
    consistent with the existing `DictionarySink` pattern that also
    overrides fragment scheduling for specific sink types.
    
    **Changes:**
    1. `TVFTableSink.java` - Added `getTvfName()` and `getBackendId()`
    accessor methods
    2. `Coordinator.java` - Added check before UNPARTITIONED scheduling: if
    the sink is a local TVFTableSink with a specific backend_id, force the
    fragment onto that backend
---
 .../java/org/apache/doris/planner/TVFTableSink.java   | 16 ++++++++++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java    | 19 +++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
index c511336767c..b8c184ca401 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
@@ -60,6 +60,22 @@ public class TVFTableSink extends DataSink {
         this.cols = cols;
     }
 
+    public String getTvfName() {
+        return tvfName;
+    }
+
+    /**
+     * Returns the backend_id specified in properties, or -1 if not set.
+     * For local TVF, this indicates the specific BE node where data should be 
written.
+     */
+    public long getBackendId() {
+        String backendIdStr = properties.get("backend_id");
+        if (backendIdStr != null) {
+            return Long.parseLong(backendIdStr);
+        }
+        return -1;
+    }
+
     public void bindDataSink() throws AnalysisException {
         TTVFTableSink tSink = new TTVFTableSink();
         tSink.setTvfName(tvfName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dbce9b17256..397ddfb982b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -72,6 +72,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.SortNode;
+import org.apache.doris.planner.TVFTableSink;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -1793,6 +1794,24 @@ public class Coordinator implements CoordInterface {
                 // TODO: rethink the whole function logic. could All BE sink 
naturally merged into other judgements?
                 return;
             }
+            // For local TVF sink with a specific backend_id, we must execute 
the sink fragment
+            // on the designated backend. Otherwise, data would be written to 
the wrong node's local disk.
+            if (fragment.getSink() instanceof TVFTableSink) {
+                TVFTableSink tvfSink = (TVFTableSink) fragment.getSink();
+                if ("local".equals(tvfSink.getTvfName()) && 
tvfSink.getBackendId() != -1) {
+                    Backend targetBackend = 
Env.getCurrentSystemInfo().getBackend(tvfSink.getBackendId());
+                    if (targetBackend == null || !targetBackend.isAlive()) {
+                        throw new UserException("Backend " + 
tvfSink.getBackendId()
+                                + " is not available for local TVF sink");
+                    }
+                    TNetworkAddress execHostport = new TNetworkAddress(
+                            targetBackend.getHost(), 
targetBackend.getBePort());
+                    this.addressToBackendID.put(execHostport, 
targetBackend.getId());
+                    FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execHostport, params);
+                    params.instanceExecParams.add(instanceParam);
+                    continue;
+                }
+            }
 
             if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
                 Reference<Long> backendIdRef = new Reference<Long>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to