This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3da2dd95ef8 [fix](fe) Fix INSERT INTO local TVF ignoring backend_id
during scheduling (#61732)
3da2dd95ef8 is described below
commit 3da2dd95ef87429eaa9449c4877599c68a9648f0
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Mar 25 18:56:30 2026 -0700
[fix](fe) Fix INSERT INTO local TVF ignoring backend_id during scheduling
(#61732)
### What problem does this PR solve?
Followup #60719
Problem Summary: When using `INSERT INTO local("backend_id" = "X" ...)`,
the data should only be written to the BE node specified by
`backend_id`. However, the Coordinator schedules the sink fragment to an
arbitrary backend because the fragment uses `UNPARTITIONED` partition,
which causes `SimpleScheduler.getHost()` to pick any available BE. This
results in file creation failures when the target directory only exists
on the intended BE.
**Root Cause:**
- The read path (`SELECT FROM local(...)`) correctly handles this via
`TVFScanNode.initBackendPolicy()`, restricting the scan to the specified
backend.
- The write path (`INSERT INTO local(...)`) had no equivalent logic.
`PhysicalPlanTranslator.visitPhysicalTVFTableSink()` creates the
fragment as `UNPARTITIONED`, and `Coordinator.computeFragmentHosts()`
assigns it to a random BE.
**Fix:**
Added backend_id-aware scheduling in
`Coordinator.computeFragmentHosts()` for local `TVFTableSink`, forcing
the sink fragment to execute on the designated backend. This is
consistent with the existing `DictionarySink` pattern that also
overrides fragment scheduling for specific sink types.
**Changes:**
1. `TVFTableSink.java` - Added `getTvfName()` and `getBackendId()`
accessor methods
2. `Coordinator.java` - Added check before UNPARTITIONED scheduling: if
the sink is a local TVFTableSink with a specific backend_id, force the
fragment onto that backend
---
.../java/org/apache/doris/planner/TVFTableSink.java | 16 ++++++++++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 19 +++++++++++++++++++
2 files changed, 35 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
index c511336767c..b8c184ca401 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
@@ -60,6 +60,22 @@ public class TVFTableSink extends DataSink {
this.cols = cols;
}
+ public String getTvfName() {
+ return tvfName;
+ }
+
+ /**
+ * Returns the backend_id specified in properties, or -1 if not set.
+ * For local TVF, this indicates the specific BE node where data should be
written.
+ */
+ public long getBackendId() {
+ String backendIdStr = properties.get("backend_id");
+ if (backendIdStr != null) {
+ return Long.parseLong(backendIdStr);
+ }
+ return -1;
+ }
+
public void bindDataSink() throws AnalysisException {
TTVFTableSink tSink = new TTVFTableSink();
tSink.setTvfName(tvfName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dbce9b17256..397ddfb982b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -72,6 +72,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
+import org.apache.doris.planner.TVFTableSink;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -1793,6 +1794,24 @@ public class Coordinator implements CoordInterface {
// TODO: rethink the whole function logic. could All BE sink
naturally merged into other judgements?
return;
}
+ // For local TVF sink with a specific backend_id, we must execute
the sink fragment
+ // on the designated backend. Otherwise, data would be written to
the wrong node's local disk.
+ if (fragment.getSink() instanceof TVFTableSink) {
+ TVFTableSink tvfSink = (TVFTableSink) fragment.getSink();
+ if ("local".equals(tvfSink.getTvfName()) &&
tvfSink.getBackendId() != -1) {
+ Backend targetBackend =
Env.getCurrentSystemInfo().getBackend(tvfSink.getBackendId());
+ if (targetBackend == null || !targetBackend.isAlive()) {
+ throw new UserException("Backend " +
tvfSink.getBackendId()
+ + " is not available for local TVF sink");
+ }
+ TNetworkAddress execHostport = new TNetworkAddress(
+ targetBackend.getHost(),
targetBackend.getBePort());
+ this.addressToBackendID.put(execHostport,
targetBackend.getId());
+ FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execHostport, params);
+ params.instanceExecParams.add(instanceParam);
+ continue;
+ }
+ }
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
Reference<Long> backendIdRef = new Reference<Long>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]