starocean999 commented on code in PR #49109:
URL: https://github.com/apache/doris/pull/49109#discussion_r2184609834


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/MysqlLoadCommand.java:
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.LoadJobRowResult;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+// LOAD command, load files into tables.
+//
+// syntax:
+// LOAD mysqlDataDesc
+// [PROPERTIES (key1=value1, )]
+// [commentSpec]
+//
+// mysqlDataDesc:
+//    DATA [ LOCAL ]
+//    INFILE "<file_name>"
+//    INTO TABLE "<tbl_name>"
+//    [ PARTITION (<partition_name> [, ... ]) ]
+//    [ COLUMNS TERMINATED BY "<column_separator>" ]
+//    [ LINES TERMINATED BY "<line_delimiter>" ]
+//    [ IGNORE <number> {LINES | ROWS} ]
+//    [ (<col_name_or_user_var> [, ... ] ) ]
+//    [ SET (col_name={<expr> | DEFAULT} [, col_name={<expr> | DEFAULT}] ...) ]
+//    [ PROPERTIES ("<key>" = "<value>" [ , ... ]) ]
+// commentSpec:
+//    COMMENT ...
+
+/**
+ * MysqlLoadCommand
+ */
+public class MysqlLoadCommand extends Command {
+    public static final String TIMEOUT_PROPERTY = "timeout";
+    public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
+    public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
+    public static final String STRICT_MODE_PROPERTY = "strict_mode";
+    public static final String TIMEZONE_PROPERTY = "timezone";
+    public static final String ENCLOSE_PROPERTY = "enclose";
+    public static final String ESCAPE_PROPERTY = "escape";
+    public static final String TRIM_DOUBLE_QUOTES_PROPERTY = 
"trim_double_quotes";
+    public static final String KEY_SKIP_LINES = "skip_lines";
+    public static final String KEY_IN_PARAM_COLUMN_SEPARATOR = 
"column_separator";
+    public static final String KEY_IN_PARAM_LINE_DELIMITER = "line_delimiter";
+    public static final String KEY_IN_PARAM_COLUMNS = "columns";
+    public static final String KEY_IN_PARAM_TEMP_PARTITIONS = 
"temporary_partitions";
+    public static final String KEY_IN_PARAM_PARTITIONS = "partitions";
+    public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
+    private static final Logger LOG = 
LogManager.getLogger(MysqlLoadCommand.class);
+
+    private static final ImmutableMap<String, Function> PROPERTIES_MAP = new 
ImmutableMap.Builder<String, Function>()
+            .put(TIMEOUT_PROPERTY, new Function<String, Long>() {
+                @Override
+                public @Nullable Long apply(@Nullable String s) {
+                    return Long.valueOf(s);
+                }
+            })
+            .put(MAX_FILTER_RATIO_PROPERTY, new Function<String, Double>() {
+                @Override
+                public @Nullable Double apply(@Nullable String s) {
+                    return Double.valueOf(s);
+                }
+            })
+            .put(EXEC_MEM_LIMIT_PROPERTY, new Function<String, Long>() {
+                @Override
+                public @Nullable Long apply(@Nullable String s) {
+                    return Long.valueOf(s);
+                }
+            })
+            .put(STRICT_MODE_PROPERTY, new Function<String, Boolean>() {
+                @Override
+                public @Nullable Boolean apply(@Nullable String s) {
+                    return Boolean.valueOf(s);
+                }
+            })
+            .put(TIMEZONE_PROPERTY, new Function<String, String>() {
+                @Override
+                public @Nullable String apply(@Nullable String s) {
+                    return s;
+                }
+            })
+            .put(TRIM_DOUBLE_QUOTES_PROPERTY, new Function<String, Boolean>() {
+                @Override
+                public @Nullable Boolean apply(@Nullable String s) {
+                    return Boolean.valueOf(s);
+                }
+            })
+            .put(ENCLOSE_PROPERTY, new Function<String, String>() {
+                @Override
+                public @Nullable String apply(@Nullable String s) {
+                    return s;
+                }
+            })
+            .put(ESCAPE_PROPERTY, new Function<String, String>() {
+                @Override
+                public @Nullable String apply(@Nullable String s) {
+                    return s;
+                }
+            })
+            .build();
+
+    private final MysqlDataDescription mysqlDataDescription;
+    private final Map<String, String> properties;
+    private final EtlJobType etlJobType = EtlJobType.LOCAL_FILE;
+    private final String comment;
+
+    /**
+     * MysqlLoadCommand
+     */
+    public MysqlLoadCommand(MysqlDataDescription mysqlDataDescription, 
Map<String, String> properties, String comment) {
+        super(PlanType.MYSQL_LOAD_COMMAND);
+        Objects.requireNonNull(mysqlDataDescription, "mysqlDataDescription is 
null");
+        Objects.requireNonNull(properties, "properties is null");
+        Objects.requireNonNull(comment, "comment is null");
+
+        this.mysqlDataDescription = mysqlDataDescription;
+        this.properties = properties;
+        this.comment = comment;
+    }
+
+    public MysqlDataDescription getMysqlDataDescription() {
+        return mysqlDataDescription;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public EtlJobType getEtlJobType() {
+        return etlJobType;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate(ctx);
+        handleMysqlLoadComand(ctx);
+    }
+
+    private void validate(ConnectContext ctx) throws UserException, 
IOException {
+        if (mysqlDataDescription == null) {
+            throw new AnalysisException("No data file in load statement.");
+        }
+

Review Comment:
   looks like some code is missing?
   
   ```
   boolean isLoadFromTable = false;
   for (DataDescription dataDescription : dataDescriptions) {
       if (brokerDesc == null && resourceDesc == null && !isMysqlLoad) {
           dataDescription.setIsHadoopLoad(true);
       }
       String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), 
analyzer);
       dataDescription.analyze(fullDbName);
   
       if (dataDescription.isLoadFromTable()) {
           isLoadFromTable = true;
       }
       Database db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
       OlapTable table = 
db.getOlapTableOrAnalysisException(dataDescription.getTableName());
       if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND
               && table.getKeysType() != KeysType.UNIQUE_KEYS) {
           throw new AnalysisException("load by MERGE or DELETE is only 
supported in unique tables.");
       }
       if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND && 
!table.hasDeleteSign()) {
           throw new AnalysisException("load by MERGE or DELETE need to upgrade 
table to support batch delete.");
       }
       if (brokerDesc != null && !brokerDesc.isMultiLoadBroker()) {
           for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
               String location = 
brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
               dataDescription.getFilePaths().set(i, location);
               dataDescription.getFilePaths().set(i, 
dataDescription.getFilePaths().get(i));
           }
       }
   }
   if (isLoadFromTable) {
       if (dataDescriptions.size() > 1) {
           throw new AnalysisException("Only support one olap table load from 
one external table");
       }
       if (resourceDesc == null) {
           throw new AnalysisException("Load from table should use Spark Load");
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to