morningman commented on code in PR #23485: URL: https://github.com/apache/doris/pull/23485#discussion_r1311857291
########## fe/fe-core/src/main/java/org/apache/doris/analysis/BulkStorageDesc.java: ########## @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TFileType; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +// Broker descriptor +// +// Broker example: +// WITH S3/HDFS +// ( +// "username" = "user0", +// "password" = "password0" +// ) +public class BulkStorageDesc implements Writable, GsonPostProcessable { + @SerializedName(value = "name") + private String name; + @SerializedName(value = "storageType") + protected StorageType storageType; + @SerializedName(value = "properties") + protected Map<String, String> properties; + + public enum StorageType { + BROKER, + S3, + HDFS, + LOCAL; + } + + public BulkStorageDesc(String name, Map<String, String> properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.storageType = StorageType.BROKER; + this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties)); + } + + public BulkStorageDesc(String name, StorageType type, Map<String, String> properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.storageType = type; + this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties)); + } + + public TFileType getFileType() { + switch (storageType) { + case LOCAL: + return TFileType.FILE_LOCAL; + case S3: + return TFileType.FILE_S3; + case HDFS: + return TFileType.FILE_HDFS; + case BROKER: + default: + return TFileType.FILE_BROKER; + } + } + + public StorageType getStorageType() { + return storageType; + } + + public Map<String, String> getProperties() { + return properties; + } + + @Override + public void gsonPostProcess() throws IOException {} Review Comment: Not need to implements `GsonPostProcessable` if unnecessary ########## fe/fe-core/src/main/java/org/apache/doris/analysis/BulkLoadDataDesc.java: ########## @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +// used to describe data info which is needed to import. +// +// data_desc: +// DATA INFILE ('file_path', ...) +// [NEGATIVE] +// INTO TABLE tbl_name +// [PARTITION (p1, p2)] +// [COLUMNS TERMINATED BY separator] +// [FORMAT AS format] +// [(tmp_col1, tmp_col2, col3, ...)] +// [COLUMNS FROM PATH AS (col1, ...)] +// [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] +// +// DATA FROM TABLE external_hive_tbl_name +// [NEGATIVE] +// INTO TABLE tbl_name +// [PARTITION (p1, p2)] +// [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] + +/** + * The transform of columns should be added after the keyword named COLUMNS. + * The transform after the keyword named SET is the old ways which only supports the hadoop function. + * It old way of transform will be removed gradually. It + */ +public class BulkLoadDataDesc { + private static final Logger LOG = LogManager.getLogger(BulkLoadDataDesc.class); + private final String tableName; + private String dbName; + private final List<String> partitionNames; + private final List<String> filePaths; + private final boolean isNegative; + // column names in the path + private final List<String> columnsFromPath; + // save column mapping in SET(xxx = xxx) clause + private final List<Expression> columnMappingList; + private final Expression precedingFilterExpr; + private final Expression whereExpr; + private LoadTask.MergeType mergeType; + private final String srcTableName; + // column names of source files + private List<String> fileFieldNames; + private String sequenceCol; + private FileFormatDesc formatDesc; + // Merged from fileFieldNames, columnsFromPath and columnMappingList + // ImportColumnDesc: column name to (expr or null) + private final Expression deleteCondition; + private final Map<String, String> dataProperties; + private boolean isMysqlLoad = false; + + public BulkLoadDataDesc(List<String> fullTableName, + List<String> partitionNames, + List<String> filePaths, + List<String> columns, + List<String> columnsFromPath, + List<Expression> columnMappingList, + FileFormatDesc formatDesc, + boolean isNegative, + Expression fileFilterExpr, + Expression whereExpr, + LoadTask.MergeType mergeType, + Expression deleteCondition, + String sequenceColName, + Map<String, String> dataProperties) { + this.dbName = Objects.requireNonNull(fullTableName.get(1), "Database name should not null"); + this.tableName = Objects.requireNonNull(fullTableName.get(2), "Table name should not null"); + this.partitionNames = Objects.requireNonNull(partitionNames, "partitionNames should not null"); + this.filePaths = Objects.requireNonNull(filePaths, "filePaths should not null"); + this.formatDesc = Objects.requireNonNull(formatDesc, "formatDesc should not null"); + this.fileFieldNames = columnsNameToLowerCase(Objects.requireNonNull(columns, "columns should not null")); + this.columnsFromPath = columnsNameToLowerCase(columnsFromPath); + this.isNegative = isNegative; + this.columnMappingList = columnMappingList; + this.precedingFilterExpr = fileFilterExpr; + this.whereExpr = whereExpr; + this.mergeType = mergeType; + // from tvf + this.srcTableName = null; + this.deleteCondition = deleteCondition; + this.sequenceCol = sequenceColName; + this.dataProperties = dataProperties; + } + + public static class FileFormatDesc { + private final Separator lineDelimiter; + private final Separator columnSeparator; + private final String fileFormat; + + public FileFormatDesc(String fileFormat) { + this(null, null, fileFormat); + } + + public FileFormatDesc(String lineDelimiter, String columnSeparator) { + this(lineDelimiter, columnSeparator, null); + } + + public FileFormatDesc(String lineDelimiter, String columnSeparator, String fileFormat) { + this.lineDelimiter = new Separator(lineDelimiter); + this.columnSeparator = new Separator(columnSeparator); + try { + if (!StringUtils.isEmpty(this.lineDelimiter.getOriSeparator())) { + this.lineDelimiter.analyze(); + } + if (!StringUtils.isEmpty(this.columnSeparator.getOriSeparator())) { + this.columnSeparator.analyze(); + } + } catch (AnalysisException e) { + throw new RuntimeException("Fail to parse separator. ", e); + } + this.fileFormat = fileFormat; + } + + public Optional<Separator> getLineDelimiter() { + if (lineDelimiter == null || lineDelimiter.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(lineDelimiter); + } + + public Optional<Separator> getColumnSeparator() { + if (columnSeparator == null || columnSeparator.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(columnSeparator); + } + + public String getFileFormat() { + return fileFormat; + } + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public List<String> getPartitionNames() { + return partitionNames; + } + + public FileFormatDesc getFormatDesc() { + return formatDesc; + } + + public List<String> getFilePaths() { + return filePaths; + } + + public List<String> getColumnsFromPath() { + return columnsFromPath; + } + + public Map<String, String> getProperties() { + return dataProperties; + } + + // Change all the columns name to lower case, because Doris column is case-insensitive. + private List<String> columnsNameToLowerCase(List<String> columns) { + if (columns == null || columns.isEmpty() || "json".equals(this.formatDesc.fileFormat)) { + return columns; + } + List<String> lowerCaseColumns = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + String column = columns.get(i); + lowerCaseColumns.add(i, column.toLowerCase()); + } + return lowerCaseColumns; + } + + public String toInsertSql(Map<String, String> properties) { + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO "); + sb.append("`").append(dbName).append("`"); + sb.append("`").append(tableName).append("`"); + sb.append("("); + + List<String> targetColumns = getTargetColumnsFromTable(dbName, tableName); + verifyLoadColumns(fileFieldNames, targetColumns); + + List<String> mappingTargetColumns = tryToTransFormToMappingColumns(targetColumns); + appendColumnFromPathColumns(mappingTargetColumns); + appendVirtualColumnForDeleteAndSeq(mappingTargetColumns); + Joiner.on(", ").appendTo(sb, mappingTargetColumns); + sb.append(")"); + + sb.append(" SELECT "); + List<String> mappingSrcColumns = tryToSetMappingColumns(fileFieldNames); + appendColumnValuesFromPathColumns(mappingSrcColumns); + appendDeleteOnConditions(mappingSrcColumns); + Joiner.on(", ").appendTo(sb, mappingSrcColumns); + sb.append(" FROM "); + + // TODO: check if s3 + // if is s3 data desc type + sb.append(" s3( "); + Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator(); Review Comment: you can use `PrintableMap` to do this. ########## fe/fe-core/src/main/java/org/apache/doris/analysis/BulkStorageDesc.java: ########## @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TFileType; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +// Broker descriptor +// +// Broker example: +// WITH S3/HDFS +// ( +// "username" = "user0", +// "password" = "password0" +// ) +public class BulkStorageDesc implements Writable, GsonPostProcessable { + @SerializedName(value = "name") + private String name; + @SerializedName(value = "storageType") + protected StorageType storageType; + @SerializedName(value = "properties") + protected Map<String, String> properties; + + public enum StorageType { + BROKER, + S3, + HDFS, + LOCAL; + } + + public BulkStorageDesc(String name, Map<String, String> properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.storageType = StorageType.BROKER; + this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties)); + } + + public BulkStorageDesc(String name, StorageType type, Map<String, String> properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.storageType = type; + this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties)); + } + + public TFileType getFileType() { + switch (storageType) { + case LOCAL: + return TFileType.FILE_LOCAL; + case S3: + return TFileType.FILE_S3; + case HDFS: + return TFileType.FILE_HDFS; + case BROKER: + default: + return TFileType.FILE_BROKER; + } + } + + public StorageType getStorageType() { Review Comment: May be we should find a way to unify `StorageType` and `TFileType`. `TFileType` is a bad name, but we have to keep it for compatibility. But it is not suitable to have both `getStorageType()` and `getFileType()`, which is strange. ########## fe/fe-core/src/main/java/org/apache/doris/analysis/BulkLoadDataDesc.java: ########## @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +// used to describe data info which is needed to import. +// +// data_desc: +// DATA INFILE ('file_path', ...) +// [NEGATIVE] +// INTO TABLE tbl_name +// [PARTITION (p1, p2)] +// [COLUMNS TERMINATED BY separator] +// [FORMAT AS format] +// [(tmp_col1, tmp_col2, col3, ...)] +// [COLUMNS FROM PATH AS (col1, ...)] +// [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] +// +// DATA FROM TABLE external_hive_tbl_name +// [NEGATIVE] +// INTO TABLE tbl_name +// [PARTITION (p1, p2)] +// [SET (k1=f1(xx), k2=f2(xxx))] +// [where_clause] + +/** + * The transform of columns should be added after the keyword named COLUMNS. + * The transform after the keyword named SET is the old ways which only supports the hadoop function. + * It old way of transform will be removed gradually. It + */ +public class BulkLoadDataDesc { + private static final Logger LOG = LogManager.getLogger(BulkLoadDataDesc.class); + private final String tableName; + private String dbName; + private final List<String> partitionNames; + private final List<String> filePaths; + private final boolean isNegative; + // column names in the path + private final List<String> columnsFromPath; + // save column mapping in SET(xxx = xxx) clause + private final List<Expression> columnMappingList; + private final Expression precedingFilterExpr; + private final Expression whereExpr; + private LoadTask.MergeType mergeType; + private final String srcTableName; + // column names of source files + private List<String> fileFieldNames; + private String sequenceCol; + private FileFormatDesc formatDesc; + // Merged from fileFieldNames, columnsFromPath and columnMappingList + // ImportColumnDesc: column name to (expr or null) + private final Expression deleteCondition; + private final Map<String, String> dataProperties; + private boolean isMysqlLoad = false; + + public BulkLoadDataDesc(List<String> fullTableName, + List<String> partitionNames, + List<String> filePaths, + List<String> columns, + List<String> columnsFromPath, + List<Expression> columnMappingList, + FileFormatDesc formatDesc, + boolean isNegative, + Expression fileFilterExpr, + Expression whereExpr, + LoadTask.MergeType mergeType, + Expression deleteCondition, + String sequenceColName, + Map<String, String> dataProperties) { + this.dbName = Objects.requireNonNull(fullTableName.get(1), "Database name should not null"); + this.tableName = Objects.requireNonNull(fullTableName.get(2), "Table name should not null"); + this.partitionNames = Objects.requireNonNull(partitionNames, "partitionNames should not null"); + this.filePaths = Objects.requireNonNull(filePaths, "filePaths should not null"); + this.formatDesc = Objects.requireNonNull(formatDesc, "formatDesc should not null"); + this.fileFieldNames = columnsNameToLowerCase(Objects.requireNonNull(columns, "columns should not null")); + this.columnsFromPath = columnsNameToLowerCase(columnsFromPath); + this.isNegative = isNegative; + this.columnMappingList = columnMappingList; + this.precedingFilterExpr = fileFilterExpr; + this.whereExpr = whereExpr; + this.mergeType = mergeType; + // from tvf + this.srcTableName = null; + this.deleteCondition = deleteCondition; + this.sequenceCol = sequenceColName; + this.dataProperties = dataProperties; + } + + public static class FileFormatDesc { + private final Separator lineDelimiter; + private final Separator columnSeparator; + private final String fileFormat; + + public FileFormatDesc(String fileFormat) { + this(null, null, fileFormat); + } + + public FileFormatDesc(String lineDelimiter, String columnSeparator) { + this(lineDelimiter, columnSeparator, null); + } + + public FileFormatDesc(String lineDelimiter, String columnSeparator, String fileFormat) { + this.lineDelimiter = new Separator(lineDelimiter); + this.columnSeparator = new Separator(columnSeparator); + try { + if (!StringUtils.isEmpty(this.lineDelimiter.getOriSeparator())) { + this.lineDelimiter.analyze(); + } + if (!StringUtils.isEmpty(this.columnSeparator.getOriSeparator())) { + this.columnSeparator.analyze(); + } + } catch (AnalysisException e) { + throw new RuntimeException("Fail to parse separator. ", e); + } + this.fileFormat = fileFormat; + } + + public Optional<Separator> getLineDelimiter() { + if (lineDelimiter == null || lineDelimiter.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(lineDelimiter); + } + + public Optional<Separator> getColumnSeparator() { + if (columnSeparator == null || columnSeparator.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(columnSeparator); + } + + public String getFileFormat() { + return fileFormat; + } + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public List<String> getPartitionNames() { + return partitionNames; + } + + public FileFormatDesc getFormatDesc() { + return formatDesc; + } + + public List<String> getFilePaths() { + return filePaths; + } + + public List<String> getColumnsFromPath() { + return columnsFromPath; + } + + public Map<String, String> getProperties() { + return dataProperties; + } + + // Change all the columns name to lower case, because Doris column is case-insensitive. + private List<String> columnsNameToLowerCase(List<String> columns) { + if (columns == null || columns.isEmpty() || "json".equals(this.formatDesc.fileFormat)) { + return columns; + } + List<String> lowerCaseColumns = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + String column = columns.get(i); + lowerCaseColumns.add(i, column.toLowerCase()); + } + return lowerCaseColumns; + } + + public String toInsertSql(Map<String, String> properties) { + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO "); + sb.append("`").append(dbName).append("`"); + sb.append("`").append(tableName).append("`"); + sb.append("("); + + List<String> targetColumns = getTargetColumnsFromTable(dbName, tableName); + verifyLoadColumns(fileFieldNames, targetColumns); + + List<String> mappingTargetColumns = tryToTransFormToMappingColumns(targetColumns); + appendColumnFromPathColumns(mappingTargetColumns); + appendVirtualColumnForDeleteAndSeq(mappingTargetColumns); + Joiner.on(", ").appendTo(sb, mappingTargetColumns); + sb.append(")"); + + sb.append(" SELECT "); + List<String> mappingSrcColumns = tryToSetMappingColumns(fileFieldNames); + appendColumnValuesFromPathColumns(mappingSrcColumns); + appendDeleteOnConditions(mappingSrcColumns); + Joiner.on(", ").appendTo(sb, mappingSrcColumns); + sb.append(" FROM "); + + // TODO: check if s3 + // if is s3 data desc type + sb.append(" s3( "); + Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<String, String> entry = iter.next(); + sb.append("\""); + sb.append(entry.getKey()); + sb.append("\"=\""); + sb.append(entry.getValue()); + sb.append("\""); + if (iter.hasNext()) { + sb.append(","); + } + } + // properties.isEmpty(); + // append s3 properties + sb.append(")"); + Expression rewrittenWhere = rewriteByColumnMappingSet(whereExpr); + String rewrittenWhereClause = rewriteByPrecedingFilter(rewrittenWhere, precedingFilterExpr); + sb.append(" WHERE ").append(rewrittenWhereClause); + return sb.toString(); + } + + private void verifyLoadColumns(List<String> fileFieldNames, List<String> targetColumns) { + } + + private String rewriteByPrecedingFilter(Expression whereExpr, Expression precedingFilterExpr) { + return whereExpr.toSql(); Review Comment: I am not sure that `toSql()` to recover the origin sql string exactly. How about just save the origin sql string when parsing the Load Stmt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org