This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f3a42faf406 [branch-2.1][improvement](jdbc catalog) support jdbc external catalog insert stmt in nereids (#40902) f3a42faf406 is described below commit f3a42faf4064695f6df781a62d62afff9e1ca0fb Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Wed Sep 18 14:02:20 2024 +0800 [branch-2.1][improvement](jdbc catalog) support jdbc external catalog insert stmt in nereids (#40902) pick (#39813) --- .../nereids/analyzer/UnboundJdbcTableSink.java | 84 ++++++++++++ .../nereids/analyzer/UnboundTableSinkCreator.java | 18 +-- .../glue/translator/PhysicalPlanTranslator.java | 20 +++ .../pre/TurnOffPageCacheForInsertIntoSelect.java | 8 ++ .../nereids/properties/RequestPropertyDeriver.java | 9 ++ .../org/apache/doris/nereids/rules/RuleSet.java | 2 + .../org/apache/doris/nereids/rules/RuleType.java | 2 + .../doris/nereids/rules/analysis/BindSink.java | 77 ++++++++++- ...ogicalJdbcTableSinkToPhysicalJdbcTableSink.java | 48 +++++++ .../apache/doris/nereids/trees/plans/PlanType.java | 3 + .../insert/BaseExternalTableInsertExecutor.java | 2 +- .../commands/insert/InsertIntoTableCommand.java | 27 +++- .../trees/plans/commands/insert/InsertUtils.java | 9 ++ .../commands/insert/JdbcInsertCommandContext.java} | 10 +- .../plans/commands/insert/JdbcInsertExecutor.java | 113 +++++++++++++++ .../trees/plans/logical/LogicalJdbcTableSink.java | 151 +++++++++++++++++++++ .../physical/PhysicalBaseExternalTableSink.java | 4 + .../plans/physical/PhysicalJdbcTableSink.java | 109 +++++++++++++++ .../nereids/trees/plans/visitor/SinkVisitor.java | 15 ++ ...actionType.java => JdbcTransactionManager.java} | 26 +++- .../apache/doris/transaction/TransactionType.java | 3 +- 21 files changed, 718 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java new file mode 100644 index 00000000000..53367cf9c21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * Represent an jdbc table sink plan node that has not been bound. + */ +public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> { + + public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames, List<String> hints, + List<String> partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundJdbcTableSink(List<String> nameParts, + List<String> colNames, + List<String> hints, + List<String> partitions, + DMLCommandType dmlCommandType, + Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child, hints, partitions); + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundJdbcTableSink should have exactly one child"); + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitUnboundJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index e1c771b6a4c..8ca58f97757 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.trees.plans.Plan; @@ -78,6 +79,9 @@ public class UnboundTableSinkCreator { } else if (curCatalog instanceof IcebergExternalCatalog) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -109,16 +113,12 @@ public class UnboundTableSinkCreator { dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, Optional.empty(), Optional.empty(), plan); - } - // TODO: we need to support insert into other catalog - try { - if (ConnectContext.get() != null) { - ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } - } catch (Exception e) { - // ignore this. + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } + throw new AnalysisException( (isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName() + " is not supported." diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 78a6a083090..c2de6c0818d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -57,6 +57,7 @@ import org.apache.doris.datasource.hudi.source.HudiScanNode; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.datasource.jdbc.sink.JdbcTableSink; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; @@ -125,6 +126,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; @@ -489,6 +491,24 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return rootFragment; } + @Override + public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = jdbcTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + List<Column> targetTableColumns = jdbcTableSink.getCols(); + List<String> insertCols = targetTableColumns.stream() + .map(Column::getName) + .collect(Collectors.toList()); + + JdbcTableSink sink = new JdbcTableSink( + ((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(), + insertCols + ); + rootFragment.setSink(sink); + return rootFragment; + } + @Override public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java index ab817c2f1d7..2479af68fbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; @@ -67,6 +68,13 @@ public class TurnOffPageCacheForInsertIntoSelect extends PlanPreprocessor { return tableSink; } + @Override + public Plan visitLogicalJdbcTableSink( + LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext context) { + turnOffPageCache(context); + return tableSink; + } + private void turnOffPageCache(StatementContext context) { SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 750707c52c4..ee3d8ee3124 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -152,6 +153,14 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { return null; } + @Override + public Void visitPhysicalJdbcTableSink( + PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext context) { + // Always use gather properties for jdbcTableSink + addRequestPropertyToChildren(PhysicalProperties.GATHER); + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) { addRequestPropertyToChildren(PhysicalProperties.GATHER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index 26920764d89..26dfa1bfcb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -76,6 +76,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHu import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; +import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink; import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin; import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin; import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit; @@ -202,6 +203,7 @@ public class RuleSet { .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink()) + .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 082ee72fbed..b2d84679f23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -32,6 +32,7 @@ public enum RuleType { BINDING_RESULT_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE), @@ -428,6 +429,7 @@ public enum RuleType { LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 793ed5cc8f8..4a9660a5144 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -31,10 +31,13 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.Scope; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -57,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -108,7 +112,8 @@ public class BindSink implements AnalysisRuleFactory { // TODO: bind hive taget table RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)), RuleType.BINDING_INSERT_ICEBERG_TABLE.build( - unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)) + unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)), + RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)) ); } @@ -502,6 +507,64 @@ public class BindSink implements AnalysisRuleFactory { return boundSink.withChildAndUpdateOutput(fullOutputProject); } + private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> ctx) { + UnboundJdbcTableSink<?> sink = ctx.root; + Pair<JdbcExternalDatabase, JdbcExternalTable> pair = bind(ctx.cascadesContext, sink); + JdbcExternalDatabase database = pair.first; + JdbcExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List<Column> bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>( + database, + table, + bindColumns, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + Map<String, NamedExpression> columnToOutput = getJdbcColumnToOutput(bindColumns, child); + // We don't need to insert unmentioned columns, only user specified columns + LogicalProject<?> outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput); + return boundSink.withChildAndUpdateOutput(outputProject); + } + + private static Map<String, NamedExpression> getJdbcColumnToOutput( + List<Column> bindColumns, LogicalPlan child) { + Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + for (int i = 0; i < bindColumns.size(); i++) { + Column column = bindColumns.get(i); + NamedExpression outputExpr = child.getOutput().get(i); + Alias output = new Alias( + TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())), + column.getName() + ); + columnToOutput.put(column.getName(), output); + } + + return columnToOutput; + } + private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) { List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), sink.getNameParts()); @@ -545,6 +608,18 @@ public class BindSink implements AnalysisRuleFactory { throw new AnalysisException("the target table of insert into is not an iceberg table"); } + private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext cascadesContext, + UnboundJdbcTableSink<? extends Plan> sink) { + List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof JdbcExternalTable) { + return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not an jdbc table"); + } + private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java new file mode 100644 index 00000000000..960350c6117 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical JdbcTableSink to physical JdbcTableSink. + */ +public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalJdbcTableSink().thenApply(ctx -> { + LogicalJdbcTableSink<? extends Plan> sink = ctx.root; + return new PhysicalJdbcTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index c665e4751d3..3ff217f39ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -49,9 +49,11 @@ public enum PlanType { LOGICAL_OLAP_TABLE_SINK, LOGICAL_HIVE_TABLE_SINK, LOGICAL_ICEBERG_TABLE_SINK, + LOGICAL_JDBC_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, LOGICAL_UNBOUND_HIVE_TABLE_SINK, + LOGICAL_UNBOUND_JDBC_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, // logical others @@ -103,6 +105,7 @@ public enum PlanType { PHYSICAL_OLAP_TABLE_SINK, PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_ICEBERG_TABLE_SINK, + PHYSICAL_JDBC_TABLE_SINK, PHYSICAL_RESULT_SINK, // physical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index 1c22b9bf56a..ef14eb914a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -46,8 +46,8 @@ import java.util.Optional; * Insert executor for base external table */ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { + protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; protected long txnId = INVALID_TXN_ID; protected TransactionStatus txnStatus = TransactionStatus.ABORTED; protected final TransactionManager transactionManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 4e89a004bd2..e5a4eac5e39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; @@ -25,12 +26,14 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; @@ -40,8 +43,11 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.planner.DataSink; import org.apache.doris.qe.ConnectContext; @@ -52,6 +58,7 @@ import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -192,9 +199,27 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf; insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner, Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert); + } else if (physicalSink instanceof PhysicalJdbcTableSink) { + boolean emptyInsert = childIsEmptyRelation(physicalSink); + List<Column> cols = ((PhysicalJdbcTableSink<?>) physicalSink).getCols(); + List<Slot> slots = ((PhysicalJdbcTableSink<?>) physicalSink).getOutput(); + if (physicalSink.children().size() == 1) { + if (physicalSink.child(0) instanceof PhysicalOneRowRelation + || physicalSink.child(0) instanceof PhysicalUnion) { + for (int i = 0; i < cols.size(); i++) { + if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) { + throw new AnalysisException("Column `" + cols.get(i).getName() + + "` is not nullable, but the inserted value is nullable."); + } + } + } + } + JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf; + insertExecutor = new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner, + Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert); } else { // TODO: support other table types - throw new AnalysisException("insert into command only support [olap, hive, iceberg] table"); + throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 48ea98ff9de..dc1fefdbff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -27,9 +27,11 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.FormatOptions; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -260,6 +262,11 @@ public class InsertUtils { throw new AnalysisException("View is not support in hive external table."); } } + if (table instanceof JdbcExternalTable) { + // todo: + // For JDBC External Table, we always allow certain columns to be missing during insertion + // Specific check for non-nullable columns only if insertion is direct VALUES or SELECT constants + } if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS) { if (unboundLogicalSink instanceof UnboundTableSink && ((UnboundTableSink<? extends Plan>) unboundLogicalSink).isPartialUpdate()) { @@ -383,6 +390,8 @@ public class InsertUtils { unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan; } else if (plan instanceof UnboundIcebergTableSink) { unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan; + } else if (plan instanceof UnboundJdbcTableSink) { + unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan; } else { throw new AnalysisException("the root of plan should be" + " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink]," diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java similarity index 81% copy from fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java index 2372c199738..71df7e417e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.transaction; +package org.apache.doris.nereids.trees.plans.commands.insert; -public enum TransactionType { - UNKNOWN, - HMS, - ICEBERG +/** + * For iceberg External Table + */ +public class JdbcInsertCommandContext extends BaseExternalTableInsertCommandContext { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java new file mode 100644 index 00000000000..928b17edf38 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for jdbc table + */ +public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor { + private static final Logger LOG = LogManager.getLogger(JdbcInsertExecutor.class); + + /** + * constructor + */ + public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table, + String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx, + boolean emptyInsert) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert); + } + + @Override + public void beginTransaction() { + // do nothing + } + + @Override + protected void onComplete() throws UserException { + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType())); + summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); + txnStatus = TransactionStatus.COMMITTED; + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + // do nothing + } + + @Override + protected void setCollectCommitInfoFunc() { + // do nothing + } + + @Override + protected void doBeforeCommit() throws UserException { + // do nothing + } + + @Override + protected TransactionType transactionType() { + return TransactionType.JDBC; + } + + @Override + protected void beforeExec() { + String queryId = DebugUtil.printId(ctx.queryId()); + LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java new file mode 100644 index 00000000000..b4027383916 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical jdbc table sink for insert command + */ +public class LogicalJdbcTableSink<CHILD_TYPE extends Plan> extends LogicalTableSink<CHILD_TYPE> + implements Sink, PropagateFuncDeps { + // bound data sink + private final JdbcExternalDatabase database; + private final JdbcExternalTable targetTable; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List<Column> cols, + List<NamedExpression> outputExprs, + DMLCommandType dmlCommandType, + Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalJdbcTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalJdbcTableSink"); + this.dmlCommandType = dmlCommandType; + } + + public Plan withChildAndUpdateOutput(Plan child) { + List<NamedExpression> output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalJdbcTableSink<>(database, targetTable, cols, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List<Plan> children) { + Preconditions.checkArgument(children.size() == 1, "LogicalJdbcTableSink only accepts one child"); + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public LogicalSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public JdbcExternalDatabase getDatabase() { + return database; + } + + public JdbcExternalTable getTargetTable() { + return targetTable; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LogicalJdbcTableSink)) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalJdbcTableSink<?> that = (LogicalJdbcTableSink<?>) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLogicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java index 82483c63a40..7c99886f06d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java @@ -71,6 +71,10 @@ public abstract class PhysicalBaseExternalTableSink<CHILD_TYPE extends Plan> ext return targetTable; } + public List<Column> getCols() { + return cols; + } + @Override public List<? extends Expression> getExpressions() { return ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java new file mode 100644 index 00000000000..2b0f12c1dea --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import java.util.List; +import java.util.Optional; + +/** physical jdbc sink */ +public class PhysicalJdbcTableSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> { + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List<Column> cols, + List<NamedExpression> outputExprs, + Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List<Column> cols, + List<NamedExpression> outputExprs, + Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List<Plan> children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, + cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public PhysicalProperties getRequirePhysicalProperties() { + // Since JDBC tables do not have partitioning, return a default physical property. + // GATHER implies that all data is gathered to a single location, which is a common requirement for JDBC sinks. + return PhysicalProperties.GATHER; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index e0b8a1dddc1..289687476b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.visitor; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; @@ -26,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResul import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -34,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -68,6 +71,10 @@ public interface SinkVisitor<R, C> { return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundJdbcTableSink(UnboundJdbcTableSink<? extends Plan> unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } @@ -96,6 +103,10 @@ public interface SinkVisitor<R, C> { return visitLogicalTableSink(icebergTableSink, context); } + default R visitLogicalJdbcTableSink(LogicalJdbcTableSink<? extends Plan> jdbcTableSink, C context) { + return visitLogicalTableSink(jdbcTableSink, context); + } + default R visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, C context) { return visitLogicalSink(logicalResultSink, context); } @@ -129,6 +140,10 @@ public interface SinkVisitor<R, C> { return visitPhysicalTableSink(icebergTableSink, context); } + default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, C context) { + return visitPhysicalTableSink(jdbcTableSink, context); + } + default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java similarity index 67% copy from fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java copy to fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java index 2372c199738..a0a1cc28803 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java @@ -17,8 +17,26 @@ package org.apache.doris.transaction; -public enum TransactionType { - UNKNOWN, - HMS, - ICEBERG +import org.apache.doris.common.UserException; + +public class JdbcTransactionManager implements TransactionManager { + @Override + public long begin() { + return 0; + } + + @Override + public void commit(long id) throws UserException { + + } + + @Override + public void rollback(long id) { + + } + + @Override + public Transaction getTransaction(long id) { + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java index 2372c199738..c83f6188890 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java @@ -20,5 +20,6 @@ package org.apache.doris.transaction; public enum TransactionType { UNKNOWN, HMS, - ICEBERG + ICEBERG, + JDBC } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org