This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f3a42faf406 [branch-2.1][improvement](jdbc catalog) support jdbc 
external catalog insert stmt  in nereids (#40902)
f3a42faf406 is described below

commit f3a42faf4064695f6df781a62d62afff9e1ca0fb
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Wed Sep 18 14:02:20 2024 +0800

    [branch-2.1][improvement](jdbc catalog) support jdbc external catalog 
insert stmt  in nereids (#40902)
    
    pick  (#39813)
---
 .../nereids/analyzer/UnboundJdbcTableSink.java     |  84 ++++++++++++
 .../nereids/analyzer/UnboundTableSinkCreator.java  |  18 +--
 .../glue/translator/PhysicalPlanTranslator.java    |  20 +++
 .../pre/TurnOffPageCacheForInsertIntoSelect.java   |   8 ++
 .../nereids/properties/RequestPropertyDeriver.java |   9 ++
 .../org/apache/doris/nereids/rules/RuleSet.java    |   2 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 +
 .../doris/nereids/rules/analysis/BindSink.java     |  77 ++++++++++-
 ...ogicalJdbcTableSinkToPhysicalJdbcTableSink.java |  48 +++++++
 .../apache/doris/nereids/trees/plans/PlanType.java |   3 +
 .../insert/BaseExternalTableInsertExecutor.java    |   2 +-
 .../commands/insert/InsertIntoTableCommand.java    |  27 +++-
 .../trees/plans/commands/insert/InsertUtils.java   |   9 ++
 .../commands/insert/JdbcInsertCommandContext.java} |  10 +-
 .../plans/commands/insert/JdbcInsertExecutor.java  | 113 +++++++++++++++
 .../trees/plans/logical/LogicalJdbcTableSink.java  | 151 +++++++++++++++++++++
 .../physical/PhysicalBaseExternalTableSink.java    |   4 +
 .../plans/physical/PhysicalJdbcTableSink.java      | 109 +++++++++++++++
 .../nereids/trees/plans/visitor/SinkVisitor.java   |  15 ++
 ...actionType.java => JdbcTransactionManager.java} |  26 +++-
 .../apache/doris/transaction/TransactionType.java  |   3 +-
 21 files changed, 718 insertions(+), 22 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
new file mode 100644
index 00000000000..53367cf9c21
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.analyzer;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Represent an jdbc table sink plan node that has not been bound.
+ */
+public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends 
UnboundBaseExternalTableSink<CHILD_TYPE> {
+
+    public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames, 
List<String> hints,
+            List<String> partitions, CHILD_TYPE child) {
+        this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
+                Optional.empty(), Optional.empty(), child);
+    }
+
+    /**
+     * constructor
+     */
+    public UnboundJdbcTableSink(List<String> nameParts,
+            List<String> colNames,
+            List<String> hints,
+            List<String> partitions,
+            DMLCommandType dmlCommandType,
+            Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties,
+            CHILD_TYPE child) {
+        super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, 
ImmutableList.of(), groupExpression,
+                logicalProperties, colNames, dmlCommandType, child, hints, 
partitions);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "UnboundJdbcTableSink should have exactly one child");
+        return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
+                dmlCommandType, Optional.empty(), Optional.empty(), 
children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitUnboundJdbcTableSink(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
+                dmlCommandType, groupExpression, 
Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
+                dmlCommandType, groupExpression, logicalProperties, 
children.get(0));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
index e1c771b6a4c..8ca58f97757 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.exceptions.ParseException;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -78,6 +79,9 @@ public class UnboundTableSinkCreator {
         } else if (curCatalog instanceof IcebergExternalCatalog) {
             return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
+        } else if (curCatalog instanceof JdbcExternalCatalog) {
+            return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
         }
         throw new RuntimeException("Load data to " + 
curCatalog.getClass().getSimpleName() + " is not supported.");
     }
@@ -109,16 +113,12 @@ public class UnboundTableSinkCreator {
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
         } else if (curCatalog instanceof IcebergExternalCatalog && 
!isAutoDetectPartition) {
             return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                dmlCommandType, Optional.empty(), Optional.empty(), plan);
-        }
-        // TODO: we need to support insert into other catalog
-        try {
-            if (ConnectContext.get() != null) {
-                
ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce();
-            }
-        } catch (Exception e) {
-            // ignore this.
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
+        } else if (curCatalog instanceof JdbcExternalCatalog) {
+            return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
         }
+
         throw new AnalysisException(
                 (isOverwrite ? "insert overwrite" : "insert") + " data to " + 
curCatalog.getClass().getSimpleName()
                         + " is not supported."
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 78a6a083090..c2de6c0818d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -57,6 +57,7 @@ import org.apache.doris.datasource.hudi.source.HudiScanNode;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
 import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
@@ -125,6 +126,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
@@ -489,6 +491,24 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return rootFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? 
extends Plan> jdbcTableSink,
+            PlanTranslatorContext context) {
+        PlanFragment rootFragment = jdbcTableSink.child().accept(this, 
context);
+        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
+        List<Column> targetTableColumns = jdbcTableSink.getCols();
+        List<String> insertCols = targetTableColumns.stream()
+                .map(Column::getName)
+                .collect(Collectors.toList());
+
+        JdbcTableSink sink = new JdbcTableSink(
+                ((JdbcExternalTable) 
jdbcTableSink.getTargetTable()).getJdbcTable(),
+                insertCols
+        );
+        rootFragment.setSink(sink);
+        return rootFragment;
+    }
+
     @Override
     public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> 
fileSink,
             PlanTranslatorContext context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
index ab817c2f1d7..2479af68fbe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
@@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
@@ -67,6 +68,13 @@ public class TurnOffPageCacheForInsertIntoSelect extends 
PlanPreprocessor {
         return tableSink;
     }
 
+    @Override
+    public Plan visitLogicalJdbcTableSink(
+            LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext 
context) {
+        turnOffPageCache(context);
+        return tableSink;
+    }
+
     private void turnOffPageCache(StatementContext context) {
         SessionVariable sessionVariable = 
context.getConnectContext().getSessionVariable();
         // set temporary session value, and then revert value in the 'finally 
block' of StmtExecutor#execute
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 750707c52c4..ee3d8ee3124 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -40,6 +40,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@@ -152,6 +153,14 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         return null;
     }
 
+    @Override
+    public Void visitPhysicalJdbcTableSink(
+            PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext 
context) {
+        // Always use gather properties for jdbcTableSink
+        addRequestPropertyToChildren(PhysicalProperties.GATHER);
+        return null;
+    }
+
     @Override
     public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> 
physicalResultSink, PlanContext context) {
         addRequestPropertyToChildren(PhysicalProperties.GATHER);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 26920764d89..26dfa1bfcb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -76,6 +76,7 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHu
 import 
org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
 import 
org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
 import 
org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
+import 
org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink;
 import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
 import 
org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
 import 
org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
@@ -202,6 +203,7 @@ public class RuleSet {
             .add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
             .add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
             .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
+            .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink())
             .add(new LogicalFileSinkToPhysicalFileSink())
             .add(new LogicalResultSinkToPhysicalResultSink())
             .add(new 
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 082ee72fbed..b2d84679f23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -32,6 +32,7 @@ public enum RuleType {
     BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
     BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
+    BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
     INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
     INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
@@ -428,6 +429,7 @@ public enum RuleType {
     
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index 793ed5cc8f8..4a9660a5144 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -31,10 +31,13 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.analyzer.Scope;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
 import org.apache.doris.nereids.analyzer.UnboundSlot;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -57,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -108,7 +112,8 @@ public class BindSink implements AnalysisRuleFactory {
                 // TODO: bind hive taget table
                 
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
                 RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
-                    
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink))
+                    
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
+                
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink))
         );
     }
 
@@ -502,6 +507,64 @@ public class BindSink implements AnalysisRuleFactory {
         return boundSink.withChildAndUpdateOutput(fullOutputProject);
     }
 
+    private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> 
ctx) {
+        UnboundJdbcTableSink<?> sink = ctx.root;
+        Pair<JdbcExternalDatabase, JdbcExternalTable> pair = 
bind(ctx.cascadesContext, sink);
+        JdbcExternalDatabase database = pair.first;
+        JdbcExternalTable table = pair.second;
+        LogicalPlan child = ((LogicalPlan) sink.child());
+
+        List<Column> bindColumns;
+        if (sink.getColNames().isEmpty()) {
+            bindColumns = 
table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
+        } else {
+            bindColumns = sink.getColNames().stream().map(cn -> {
+                Column column = table.getColumn(cn);
+                if (column == null) {
+                    throw new AnalysisException(String.format("column %s is 
not found in table %s",
+                            cn, table.getName()));
+                }
+                return column;
+            }).collect(ImmutableList.toImmutableList());
+        }
+        LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>(
+                database,
+                table,
+                bindColumns,
+                child.getOutput().stream()
+                        .map(NamedExpression.class::cast)
+                        .collect(ImmutableList.toImmutableList()),
+                sink.getDMLCommandType(),
+                Optional.empty(),
+                Optional.empty(),
+                child);
+        // we need to insert all the columns of the target table
+        if (boundSink.getCols().size() != child.getOutput().size()) {
+            throw new AnalysisException("insert into cols should be 
corresponding to the query output");
+        }
+        Map<String, NamedExpression> columnToOutput = 
getJdbcColumnToOutput(bindColumns, child);
+        // We don't need to insert unmentioned columns, only user specified 
columns
+        LogicalProject<?> outputProject = 
getOutputProjectByCoercion(bindColumns, child, columnToOutput);
+        return boundSink.withChildAndUpdateOutput(outputProject);
+    }
+
+    private static Map<String, NamedExpression> getJdbcColumnToOutput(
+            List<Column> bindColumns, LogicalPlan child) {
+        Map<String, NamedExpression> columnToOutput = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+
+        for (int i = 0; i < bindColumns.size(); i++) {
+            Column column = bindColumns.get(i);
+            NamedExpression outputExpr = child.getOutput().get(i);
+            Alias output = new Alias(
+                    TypeCoercionUtils.castIfNotSameType(outputExpr, 
DataType.fromCatalogType(column.getType())),
+                    column.getName()
+            );
+            columnToOutput.put(column.getName(), output);
+        }
+
+        return columnToOutput;
+    }
+
     private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, 
UnboundTableSink<? extends Plan> sink) {
         List<String> tableQualifier = 
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                 sink.getNameParts());
@@ -545,6 +608,18 @@ public class BindSink implements AnalysisRuleFactory {
         throw new AnalysisException("the target table of insert into is not an 
iceberg table");
     }
 
+    private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext 
cascadesContext,
+            UnboundJdbcTableSink<? extends Plan> sink) {
+        List<String> tableQualifier = 
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
+                sink.getNameParts());
+        Pair<DatabaseIf<?>, TableIf> pair = 
RelationUtil.getDbAndTable(tableQualifier,
+                cascadesContext.getConnectContext().getEnv());
+        if (pair.second instanceof JdbcExternalTable) {
+            return Pair.of(((JdbcExternalDatabase) pair.first), 
(JdbcExternalTable) pair.second);
+        }
+        throw new AnalysisException("the target table of insert into is not an 
jdbc table");
+    }
+
     private List<Long> bindPartitionIds(OlapTable table, List<String> 
partitions, boolean temp) {
         return partitions.isEmpty()
                 ? ImmutableList.of()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
new file mode 100644
index 00000000000..960350c6117
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
+
+import java.util.Optional;
+
+/**
+ * Implementation rule that convert logical JdbcTableSink to physical 
JdbcTableSink.
+ */
+public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalJdbcTableSink().thenApply(ctx -> {
+            LogicalJdbcTableSink<? extends Plan> sink = ctx.root;
+            return new PhysicalJdbcTableSink<>(
+                    sink.getDatabase(),
+                    sink.getTargetTable(),
+                    sink.getCols(),
+                    sink.getOutputExprs(),
+                    Optional.empty(),
+                    sink.getLogicalProperties(),
+                    null,
+                    null,
+                    sink.child());
+        
}).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index c665e4751d3..3ff217f39ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -49,9 +49,11 @@ public enum PlanType {
     LOGICAL_OLAP_TABLE_SINK,
     LOGICAL_HIVE_TABLE_SINK,
     LOGICAL_ICEBERG_TABLE_SINK,
+    LOGICAL_JDBC_TABLE_SINK,
     LOGICAL_RESULT_SINK,
     LOGICAL_UNBOUND_OLAP_TABLE_SINK,
     LOGICAL_UNBOUND_HIVE_TABLE_SINK,
+    LOGICAL_UNBOUND_JDBC_TABLE_SINK,
     LOGICAL_UNBOUND_RESULT_SINK,
 
     // logical others
@@ -103,6 +105,7 @@ public enum PlanType {
     PHYSICAL_OLAP_TABLE_SINK,
     PHYSICAL_HIVE_TABLE_SINK,
     PHYSICAL_ICEBERG_TABLE_SINK,
+    PHYSICAL_JDBC_TABLE_SINK,
     PHYSICAL_RESULT_SINK,
 
     // physical others
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index 1c22b9bf56a..ef14eb914a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -46,8 +46,8 @@ import java.util.Optional;
  * Insert executor for base external table
  */
 public abstract class BaseExternalTableInsertExecutor extends 
AbstractInsertExecutor {
+    protected static final long INVALID_TXN_ID = -1L;
     private static final Logger LOG = 
LogManager.getLogger(BaseExternalTableInsertExecutor.class);
-    private static final long INVALID_TXN_ID = -1L;
     protected long txnId = INVALID_TXN_ID;
     protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
     protected final TransactionManager transactionManager;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 4e89a004bd2..e5a4eac5e39 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
@@ -25,12 +26,14 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -40,8 +43,11 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.qe.ConnectContext;
@@ -52,6 +58,7 @@ import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -192,9 +199,27 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                 IcebergExternalTable icebergExternalTable = 
(IcebergExternalTable) targetTableIf;
                 insertExecutor = new IcebergInsertExecutor(ctx, 
icebergExternalTable, label, planner,
                         Optional.of(insertCtx.orElse((new 
BaseExternalTableInsertCommandContext()))), emptyInsert);
+            } else if (physicalSink instanceof PhysicalJdbcTableSink) {
+                boolean emptyInsert = childIsEmptyRelation(physicalSink);
+                List<Column> cols = ((PhysicalJdbcTableSink<?>) 
physicalSink).getCols();
+                List<Slot> slots = ((PhysicalJdbcTableSink<?>) 
physicalSink).getOutput();
+                if (physicalSink.children().size() == 1) {
+                    if (physicalSink.child(0) instanceof PhysicalOneRowRelation
+                            || physicalSink.child(0) instanceof PhysicalUnion) 
{
+                        for (int i = 0; i < cols.size(); i++) {
+                            if (!(cols.get(i).isAllowNull()) && 
slots.get(i).nullable()) {
+                                throw new AnalysisException("Column `" + 
cols.get(i).getName()
+                                        + "` is not nullable, but the inserted 
value is nullable.");
+                            }
+                        }
+                    }
+                }
+                JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) 
targetTableIf;
+                insertExecutor = new JdbcInsertExecutor(ctx, 
jdbcExternalTable, label, planner,
+                        Optional.of(insertCtx.orElse((new 
JdbcInsertCommandContext()))), emptyInsert);
             } else {
                 // TODO: support other table types
-                throw new AnalysisException("insert into command only support 
[olap, hive, iceberg] table");
+                throw new AnalysisException("insert into command only support 
[olap, hive, iceberg, jdbc] table");
             }
             if (!insertExecutor.isEmptyInsert()) {
                 insertExecutor.beginTransaction();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 48ea98ff9de..dc1fefdbff4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -27,9 +27,11 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FormatOptions;
 import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
 import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -260,6 +262,11 @@ public class InsertUtils {
                 throw new AnalysisException("View is not support in hive 
external table.");
             }
         }
+        if (table instanceof JdbcExternalTable) {
+            // todo:
+            // For JDBC External Table, we always allow certain columns to be 
missing during insertion
+            // Specific check for non-nullable columns only if insertion is 
direct VALUES or SELECT constants
+        }
         if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == 
KeysType.UNIQUE_KEYS) {
             if (unboundLogicalSink instanceof UnboundTableSink
                     && ((UnboundTableSink<? extends Plan>) 
unboundLogicalSink).isPartialUpdate()) {
@@ -383,6 +390,8 @@ public class InsertUtils {
             unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
         } else if (plan instanceof UnboundIcebergTableSink) {
             unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan;
+        } else if (plan instanceof UnboundJdbcTableSink) {
+            unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan;
         } else {
             throw new AnalysisException("the root of plan should be"
                     + " [UnboundTableSink, UnboundHiveTableSink, 
UnboundIcebergTableSink],"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
similarity index 81%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
index 2372c199738..71df7e417e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
+package org.apache.doris.nereids.trees.plans.commands.insert;
 
-public enum TransactionType {
-    UNKNOWN,
-    HMS,
-    ICEBERG
+/**
+ * For iceberg External Table
+ */
+public class JdbcInsertCommandContext extends 
BaseExternalTableInsertCommandContext {
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
new file mode 100644
index 00000000000..928b17edf38
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TransactionType;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for jdbc table
+ */
+public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(JdbcInsertExecutor.class);
+
+    /**
+     * constructor
+     */
+    public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table,
+            String labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx,
+            boolean emptyInsert) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+    }
+
+    @Override
+    public void beginTransaction() {
+        // do nothing
+    }
+
+    @Override
+    protected void onComplete() throws UserException {
+        if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
+            LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
+        } else {
+            summaryProfile.ifPresent(profile -> 
profile.setTransactionBeginTime(transactionType()));
+            summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
+            txnStatus = TransactionStatus.COMMITTED;
+        }
+    }
+
+    @Override
+    protected void onFail(Throwable t) {
+        errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
+        String queryId = DebugUtil.printId(ctx.queryId());
+        // if any throwable being thrown during insert operation, first we 
should abort this txn
+        LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
+        StringBuilder sb = new StringBuilder(t.getMessage());
+        if (txnId != INVALID_TXN_ID) {
+            LOG.warn("insert [{}] with query id {} abort txn {} failed", 
labelName, queryId, txnId);
+            if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+                sb.append(". url: ").append(coordinator.getTrackingUrl());
+            }
+        }
+        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
+    }
+
+    @Override
+    protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
+        // do nothing
+    }
+
+    @Override
+    protected void setCollectCommitInfoFunc() {
+        // do nothing
+    }
+
+    @Override
+    protected void doBeforeCommit() throws UserException {
+        // do nothing
+    }
+
+    @Override
+    protected TransactionType transactionType() {
+        return TransactionType.JDBC;
+    }
+
+    @Override
+    protected void beforeExec() {
+        String queryId = DebugUtil.printId(ctx.queryId());
+        LOG.info("start insert [{}] with query id {} and txn id {}", 
labelName, queryId, txnId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
new file mode 100644
index 00000000000..b4027383916
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * logical jdbc table sink for insert command
+ */
+public class LogicalJdbcTableSink<CHILD_TYPE extends Plan> extends 
LogicalTableSink<CHILD_TYPE>
+        implements Sink, PropagateFuncDeps {
+    // bound data sink
+    private final JdbcExternalDatabase database;
+    private final JdbcExternalTable targetTable;
+    private final DMLCommandType dmlCommandType;
+
+    /**
+     * constructor
+     */
+    public LogicalJdbcTableSink(JdbcExternalDatabase database,
+                                JdbcExternalTable targetTable,
+                                List<Column> cols,
+                                List<NamedExpression> outputExprs,
+                                DMLCommandType dmlCommandType,
+                                Optional<GroupExpression> groupExpression,
+                                Optional<LogicalProperties> logicalProperties,
+                                CHILD_TYPE child) {
+        super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, cols, child);
+        this.database = Objects.requireNonNull(database, "database != null in 
LogicalJdbcTableSink");
+        this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in LogicalJdbcTableSink");
+        this.dmlCommandType = dmlCommandType;
+    }
+
+    public Plan withChildAndUpdateOutput(Plan child) {
+        List<NamedExpression> output = child.getOutput().stream()
+                .map(NamedExpression.class::cast)
+                .collect(ImmutableList.toImmutableList());
+        return new LogicalJdbcTableSink<>(database, targetTable, cols, output,
+                dmlCommandType, Optional.empty(), Optional.empty(), child);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1, 
"LogicalJdbcTableSink only accepts one child");
+        return new LogicalJdbcTableSink<>(database, targetTable, cols, 
outputExprs,
+                dmlCommandType, Optional.empty(), Optional.empty(), 
children.get(0));
+    }
+
+    @Override
+    public LogicalSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> 
outputExprs) {
+        return new LogicalJdbcTableSink<>(database, targetTable, cols, 
outputExprs,
+                dmlCommandType, Optional.empty(), Optional.empty(), child());
+    }
+
+    public JdbcExternalDatabase getDatabase() {
+        return database;
+    }
+
+    public JdbcExternalTable getTargetTable() {
+        return targetTable;
+    }
+
+    public DMLCommandType getDmlCommandType() {
+        return dmlCommandType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof LogicalJdbcTableSink)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalJdbcTableSink<?> that = (LogicalJdbcTableSink<?>) o;
+        return dmlCommandType == that.dmlCommandType
+                && Objects.equals(database, that.database)
+                && Objects.equals(targetTable, that.targetTable) && 
Objects.equals(cols, that.cols);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), database, targetTable, 
dmlCommandType);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]",
+                "outputExprs", outputExprs,
+                "database", database.getFullName(),
+                "targetTable", targetTable.getName(),
+                "cols", cols,
+                "dmlCommandType", dmlCommandType
+        );
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalJdbcTableSink(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new LogicalJdbcTableSink<>(database, targetTable, cols, 
outputExprs,
+                dmlCommandType, groupExpression, 
Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new LogicalJdbcTableSink<>(database, targetTable, cols, 
outputExprs,
+                dmlCommandType, groupExpression, logicalProperties, 
children.get(0));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
index 82483c63a40..7c99886f06d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
@@ -71,6 +71,10 @@ public abstract class 
PhysicalBaseExternalTableSink<CHILD_TYPE extends Plan> ext
         return targetTable;
     }
 
+    public List<Column> getCols() {
+        return cols;
+    }
+
     @Override
     public List<? extends Expression> getExpressions() {
         return ImmutableList.of();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
new file mode 100644
index 00000000000..2b0f12c1dea
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.statistics.Statistics;
+
+import java.util.List;
+import java.util.Optional;
+
+/** physical jdbc sink */
+public class PhysicalJdbcTableSink<CHILD_TYPE extends Plan> extends 
PhysicalBaseExternalTableSink<CHILD_TYPE> {
+
+    /**
+     * constructor
+     */
+    public PhysicalJdbcTableSink(JdbcExternalDatabase database,
+                                 JdbcExternalTable targetTable,
+                                 List<Column> cols,
+                                 List<NamedExpression> outputExprs,
+                                 Optional<GroupExpression> groupExpression,
+                                 LogicalProperties logicalProperties,
+                                 CHILD_TYPE child) {
+        this(database, targetTable, cols, outputExprs, groupExpression, 
logicalProperties,
+                PhysicalProperties.GATHER, null, child);
+    }
+
+    /**
+     * constructor
+     */
+    public PhysicalJdbcTableSink(JdbcExternalDatabase database,
+                                 JdbcExternalTable targetTable,
+                                 List<Column> cols,
+                                 List<NamedExpression> outputExprs,
+                                 Optional<GroupExpression> groupExpression,
+                                 LogicalProperties logicalProperties,
+                                 PhysicalProperties physicalProperties,
+                                 Statistics statistics,
+                                 CHILD_TYPE child) {
+        super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols, 
outputExprs, groupExpression,
+                logicalProperties, physicalProperties, statistics, child);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        return new PhysicalJdbcTableSink<>(
+                (JdbcExternalDatabase) database, (JdbcExternalTable) 
targetTable,
+                cols, outputExprs, groupExpression,
+                getLogicalProperties(), physicalProperties, statistics, 
children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalJdbcTableSink(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalJdbcTableSink<>(
+                (JdbcExternalDatabase) database, (JdbcExternalTable) 
targetTable, cols, outputExprs,
+                groupExpression, getLogicalProperties(), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new PhysicalJdbcTableSink<>(
+                (JdbcExternalDatabase) database, (JdbcExternalTable) 
targetTable, cols, outputExprs,
+                groupExpression, logicalProperties.get(), children.get(0));
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalJdbcTableSink<>(
+                (JdbcExternalDatabase) database, (JdbcExternalTable) 
targetTable, cols, outputExprs,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+
+    @Override
+    public PhysicalProperties getRequirePhysicalProperties() {
+        // Since JDBC tables do not have partitioning, return a default 
physical property.
+        // GATHER implies that all data is gathered to a single location, 
which is a common requirement for JDBC sinks.
+        return PhysicalProperties.GATHER;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index e0b8a1dddc1..289687476b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
 
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
 import org.apache.doris.nereids.analyzer.UnboundResultSink;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -26,6 +27,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResul
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -34,6 +36,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@@ -68,6 +71,10 @@ public interface SinkVisitor<R, C> {
         return visitLogicalSink(unboundTableSink, context);
     }
 
+    default R visitUnboundJdbcTableSink(UnboundJdbcTableSink<? extends Plan> 
unboundTableSink, C context) {
+        return visitLogicalSink(unboundTableSink, context);
+    }
+
     default R visitUnboundResultSink(UnboundResultSink<? extends Plan> 
unboundResultSink, C context) {
         return visitLogicalSink(unboundResultSink, context);
     }
@@ -96,6 +103,10 @@ public interface SinkVisitor<R, C> {
         return visitLogicalTableSink(icebergTableSink, context);
     }
 
+    default R visitLogicalJdbcTableSink(LogicalJdbcTableSink<? extends Plan> 
jdbcTableSink, C context) {
+        return visitLogicalTableSink(jdbcTableSink, context);
+    }
+
     default R visitLogicalResultSink(LogicalResultSink<? extends Plan> 
logicalResultSink, C context) {
         return visitLogicalSink(logicalResultSink, context);
     }
@@ -129,6 +140,10 @@ public interface SinkVisitor<R, C> {
         return visitPhysicalTableSink(icebergTableSink, context);
     }
 
+    default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> 
jdbcTableSink, C context) {
+        return visitPhysicalTableSink(jdbcTableSink, context);
+    }
+
     default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> 
physicalResultSink, C context) {
         return visitPhysicalSink(physicalResultSink, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
similarity index 67%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
index 2372c199738..a0a1cc28803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
@@ -17,8 +17,26 @@
 
 package org.apache.doris.transaction;
 
-public enum TransactionType {
-    UNKNOWN,
-    HMS,
-    ICEBERG
+import org.apache.doris.common.UserException;
+
+public class JdbcTransactionManager implements TransactionManager {
+    @Override
+    public long begin() {
+        return 0;
+    }
+
+    @Override
+    public void commit(long id) throws UserException {
+
+    }
+
+    @Override
+    public void rollback(long id) {
+
+    }
+
+    @Override
+    public Transaction getTransaction(long id) {
+        return null;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
index 2372c199738..c83f6188890 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
@@ -20,5 +20,6 @@ package org.apache.doris.transaction;
 public enum TransactionType {
     UNKNOWN,
     HMS,
-    ICEBERG
+    ICEBERG,
+    JDBC
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to