This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 72e0330aa1a branch-3.1: [Chore](code-clear)Unify the use of BrokerDesc
as the storage property bridge for Load (#57581)
72e0330aa1a is described below
commit 72e0330aa1a89816ebbe4a930a904e56f077349f
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Nov 3 11:17:57 2025 +0800
branch-3.1: [Chore](code-clear)Unify the use of BrokerDesc as the storage
property bridge for Load (#57581)
This involves some code in #53013
removing redundant property declarations. BulkLoadDesc has no
persistence or serialization requirements, so related implementations
have been safely removed to simplify the code.
---
.../doris/nereids/parser/LogicalPlanBuilder.java | 20 +++----------------
.../nereids/trees/plans/commands/LoadCommand.java | 23 +++++++++++-----------
2 files changed, 15 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index e7cdd8770df..a01b73ec9af 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -410,7 +410,6 @@ import
org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRenameInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVReplaceInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterViewInfo;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
-import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
@@ -1174,22 +1173,9 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
*/
@Override
public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
-
- BulkStorageDesc bulkDesc = null;
+ BrokerDesc brokerDesc = null;
if (ctx.withRemoteStorageSystem() != null) {
- Map<String, String> bulkProperties =
- new
HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties));
- if (ctx.withRemoteStorageSystem().S3() != null) {
- bulkDesc = new BulkStorageDesc("S3",
BulkStorageDesc.StorageType.S3, bulkProperties);
- } else if (ctx.withRemoteStorageSystem().HDFS() != null) {
- bulkDesc = new BulkStorageDesc("HDFS",
BulkStorageDesc.StorageType.HDFS, bulkProperties);
- } else if (ctx.withRemoteStorageSystem().LOCAL() != null) {
- bulkDesc = new BulkStorageDesc("LOCAL_HDFS",
BulkStorageDesc.StorageType.LOCAL, bulkProperties);
- } else if (ctx.withRemoteStorageSystem().BROKER() != null
- &&
ctx.withRemoteStorageSystem().identifierOrText().getText() != null) {
- bulkDesc = new
BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(),
- bulkProperties);
- }
+ brokerDesc =
visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem());
}
ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new
ImmutableList.Builder<>();
List<String> labelParts = visitMultipartIdentifier(ctx.lableName);
@@ -1270,7 +1256,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
String commentSpec = ctx.commentSpec() == null ? "''" :
ctx.commentSpec().STRING_LITERAL().getText();
String comment =
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1,
commentSpec.length() - 1));
- return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc,
properties, comment);
+ return new LoadCommand(labelName, dataDescriptions.build(),
brokerDesc, properties, comment);
}
/*
********************************************************************************************
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index 4ccb8916cd8..7d592af0632 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -17,7 +17,9 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StmtType;
+import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@@ -49,7 +51,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.scalar.If;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
-import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
@@ -89,7 +90,7 @@ public class LoadCommand extends Command implements
ForwardWithSync {
public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
private final String labelName;
- private final BulkStorageDesc bulkStorageDesc;
+ private final BrokerDesc brokerDesc;
private final Set<String> sinkTableNames = new HashSet<>();
private final List<BulkLoadDataDesc> sourceInfos;
private final Map<String, String> properties;
@@ -100,13 +101,13 @@ public class LoadCommand extends Command implements
ForwardWithSync {
/**
* constructor of ExportCommand
*/
- public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos,
BulkStorageDesc bulkStorageDesc,
+ public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos,
BrokerDesc brokerDesc,
Map<String, String> properties, String comment) {
super(PlanType.LOAD_COMMAND);
this.labelName = Objects.requireNonNull(labelName.trim(), "labelName
should not null");
this.sourceInfos =
Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should
not null");
this.properties =
Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not
null");
- this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc,
"bulkStorageDesc should not null");
+ this.brokerDesc = Objects.requireNonNull(brokerDesc, "brokerDesc
should not null");
this.comment = Objects.requireNonNull(comment, "comment should not
null");
}
@@ -151,7 +152,7 @@ public class LoadCommand extends Command implements
ForwardWithSync {
LOG.debug("nereids load stmt before conversion: {}",
dataDesc::toSql);
}
// 1. build source projects plan (select col1,col2... from tvf where
prefilter)
- Map<String, String> tvfProperties = getTvfProperties(dataDesc,
bulkStorageDesc);
+ Map<String, String> tvfProperties = getTvfProperties(dataDesc,
brokerDesc);
LogicalPlan tvfLogicalPlan = new
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties));
tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties,
tvfLogicalPlan);
@@ -431,15 +432,15 @@ public class LoadCommand extends Command implements
ForwardWithSync {
private UnboundTVFRelation getUnboundTVFRelation(Map<String, String>
properties) {
UnboundTVFRelation relation;
- if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.S3) {
+ if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
relation = new
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
S3TableValuedFunction.NAME, new Properties(properties));
- } else if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.HDFS) {
+ } else if (brokerDesc.getStorageType() ==
StorageBackend.StorageType.HDFS) {
relation = new
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
HdfsTableValuedFunction.NAME, new Properties(properties));
} else {
throw new UnsupportedOperationException("Unsupported load storage
type: "
- + bulkStorageDesc.getStorageType());
+ + brokerDesc.getStorageType());
}
return relation;
}
@@ -454,8 +455,8 @@ public class LoadCommand extends Command implements
ForwardWithSync {
return targetTable;
}
- private static Map<String, String> getTvfProperties(BulkLoadDataDesc
dataDesc, BulkStorageDesc bulkStorageDesc) {
- Map<String, String> tvfProperties = new
HashMap<>(bulkStorageDesc.getProperties());
+ private static Map<String, String> getTvfProperties(BulkLoadDataDesc
dataDesc, BrokerDesc brokerDesc) {
+ Map<String, String> tvfProperties = new
HashMap<>(brokerDesc.getProperties());
String fileFormat =
dataDesc.getFormatDesc().getFileFormat().orElse("csv");
if ("csv".equalsIgnoreCase(fileFormat)) {
dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
@@ -469,7 +470,7 @@ public class LoadCommand extends Command implements
ForwardWithSync {
List<String> filePaths = dataDesc.getFilePaths();
// TODO: support multi location by union
String listFilePath = filePaths.get(0);
- if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.S3) {
+ if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
// TODO: check file path by s3 fs list status
tvfProperties.put("uri", listFilePath);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]