This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 36eeb107125 [opt](tvf) refine the class of ExternalFileTableValuedFunction (#24706) (#25384) 36eeb107125 is described below commit 36eeb1071250eac6074e9dd423b0a5ad3e69548f Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Oct 17 14:29:02 2023 +0800 [opt](tvf) refine the class of ExternalFileTableValuedFunction (#24706) (#25384) backport #24706 --- .../org/apache/doris/analysis/DataDescription.java | 16 +- .../org/apache/doris/analysis/OutFileClause.java | 6 +- .../org/apache/doris/analysis/S3TvfLoadStmt.java | 11 +- .../org/apache/doris/analysis/StorageBackend.java | 42 ++++ .../java/org/apache/doris/common/FeConstants.java | 11 +- .../doris/common/util/FileFormatConstants.java | 57 ++++++ .../apache/doris/common/util/FileFormatUtils.java | 108 ++++++++++ .../java/org/apache/doris/common/util/Util.java | 14 +- .../org/apache/doris/planner/ResultFileSink.java | 6 +- .../doris/planner/external/FileQueryScanNode.java | 11 +- .../doris/planner/external/LoadScanProvider.java | 6 +- .../apache/doris/planner/external/TVFScanNode.java | 4 - .../ExternalFileTableValuedFunction.java | 226 +++++++-------------- .../tablefunction/HdfsTableValuedFunction.java | 65 +++--- .../HttpStreamTableValuedFunction.java | 25 +-- .../tablefunction/LocalTableValuedFunction.java | 39 ++-- .../doris/tablefunction/S3TableValuedFunction.java | 133 ++++++------ .../ExternalFileTableValuedFunctionTest.java | 10 +- .../export_p2/test_export_max_file_size.groovy | 4 +- .../suites/export_p2/test_export_with_hdfs.groovy | 2 +- .../test_outfile_orc_max_file_size.groovy | 1 - .../hive/test_different_parquet_types.groovy | 16 +- .../external_table_p0/tvf/test_hdfs_tvf.groovy | 27 +-- .../tvf/test_hdfs_tvf_compression.groovy | 13 -- .../tvf/test_path_partition_keys.groovy | 5 - .../tvf/test_s3_tvf_compression.groovy | 6 + .../external_table_p2/tvf/test_tvf_p2.groovy | 18 +- .../tvf/test_tvf_view_count_p2.groovy | 3 +- .../external_table_p2/tvf/test_tvf_view_p2.groovy | 6 +- 29 files changed, 463 insertions(+), 428 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index fc867847f18..618c80df5c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -28,8 +28,8 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.loadv2.LoadTask; @@ -1110,13 +1110,13 @@ public class DataDescription implements InsertStmt.DataDesc { // file format // note(tsy): for historical reason, file format here must be string type rather than TFileFormatType if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase("parquet") - && !fileFormat.equalsIgnoreCase(FeConstants.csv) - && !fileFormat.equalsIgnoreCase("orc") - && !fileFormat.equalsIgnoreCase("json") - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types) - && !fileFormat.equalsIgnoreCase("hive_text")) { + if (!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) { throw new AnalysisException("File Format Type " + fileFormat + " is invalid."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 742e1636fdc..7d5c4433564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -23,10 +23,10 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.Util; @@ -242,11 +242,11 @@ public class OutFileClause { fileFormatType = TFileFormatType.FORMAT_ORC; break; case "csv_with_names": - headerType = FeConstants.csv_with_names; + headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "csv_with_names_and_types": - headerType = FeConstants.csv_with_names_and_types; + headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java index a6f864a8c06..ac83f26d49b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java @@ -24,9 +24,9 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.datasource.property.constants.S3Properties.Env; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; import com.google.common.annotations.VisibleForTesting; @@ -145,7 +145,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { final List<String> filePaths = dataDescription.getFilePaths(); Preconditions.checkState(filePaths.size() == 1, "there should be only one file path"); final String s3FilePath = filePaths.get(0); - params.put(S3TableValuedFunction.S3_URI, s3FilePath); + params.put(S3TableValuedFunction.PROP_URI, s3FilePath); final Map<String, String> dataDescProp = dataDescription.getProperties(); if (dataDescProp != null) { @@ -153,7 +153,8 @@ public class S3TvfLoadStmt extends NativeInsertStmt { } final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT); - params.put(ExternalFileTableValuedFunction.FORMAT, format); + params.put(FileFormatConstants.PROP_FORMAT, format); + if (isCsvFormat(format)) { parseSeparator(dataDescription.getColumnSeparatorObj(), params); parseSeparator(dataDescription.getLineDelimiterObj(), params); @@ -161,7 +162,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { List<String> columnsFromPath = dataDescription.getColumnsFromPath(); if (columnsFromPath != null) { - params.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS, + params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS, String.join(",", columnsFromPath)); } @@ -189,7 +190,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { } catch (AnalysisException e) { throw new DdlException(String.format("failed to parse separator:%s", separator), e); } - tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, separator.getSeparator()); + tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, separator.getSeparator()); } private static boolean isCsvFormat(String format) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index 5d6c33c45e7..5d069ea4b23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -22,6 +22,7 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.common.util.URI; import org.apache.doris.datasource.property.constants.BosProperties; import org.apache.doris.thrift.TStorageBackendType; @@ -34,6 +35,47 @@ public class StorageBackend implements ParseNode { private String location; private StorageDesc storageDesc; + public static void checkPath(String path, StorageBackend.StorageType type) throws AnalysisException { + if (Strings.isNullOrEmpty(path)) { + throw new AnalysisException("No destination path specified."); + } + checkUri(URI.create(path), type); + } + + public static void checkUri(URI uri, StorageBackend.StorageType type) throws AnalysisException { + String schema = uri.getScheme(); + if (schema == null) { + throw new AnalysisException( + "Invalid export path, there is no schema of URI found. please check your path."); + } + if (type == StorageBackend.StorageType.BROKER) { + if (!schema.equalsIgnoreCase("bos") + && !schema.equalsIgnoreCase("afs") + && !schema.equalsIgnoreCase("hdfs") + && !schema.equalsIgnoreCase("viewfs") + && !schema.equalsIgnoreCase("ofs") + && !schema.equalsIgnoreCase("obs") + && !schema.equalsIgnoreCase("oss") + && !schema.equalsIgnoreCase("s3a") + && !schema.equalsIgnoreCase("cosn") + && !schema.equalsIgnoreCase("gfs") + && !schema.equalsIgnoreCase("jfs") + && !schema.equalsIgnoreCase("gs")) { + throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'viewfs://', 'afs://'," + + " 'bos://', 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://'" + + " or 'jfs://' path."); + } + } else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) { + throw new AnalysisException("Invalid export path. please use valid 's3://' path."); + } else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs") + && !schema.equalsIgnoreCase("viewfs")) { + throw new AnalysisException("Invalid export path. please use valid 'HDFS://' or 'viewfs://' path."); + } else if (type == StorageBackend.StorageType.LOCAL && !schema.equalsIgnoreCase("file")) { + throw new AnalysisException( + "Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path."); + } + } + public StorageBackend(String storageName, String location, StorageType storageType, Map<String, String> properties) { this.storageDesc = new StorageDesc(storageName, storageType, properties); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 0845d593e24..487d50283d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -20,9 +20,6 @@ package org.apache.doris.common; import org.apache.doris.persist.meta.FeMetaFormat; public class FeConstants { - // Database and table's default configurations, we will never change them - public static short default_replication_num = 3; - // The default value of bucket setting && auto bucket without estimate_partition_size public static int default_bucket_num = 10; @@ -39,7 +36,6 @@ public class FeConstants { public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes - public static int ip_check_interval_second = 5; // dpp version public static String dpp_version = "3_2_0"; @@ -69,12 +65,6 @@ public class FeConstants { public static long tablet_checker_interval_ms = 20 * 1000L; public static long tablet_schedule_interval_ms = 1000L; - public static String csv = "csv"; - public static String csv_with_names = "csv_with_names"; - public static String csv_with_names_and_types = "csv_with_names_and_types"; - - public static String text = "hive_text"; - public static String FS_PREFIX_S3 = "s3"; public static String FS_PREFIX_S3A = "s3a"; public static String FS_PREFIX_S3N = "s3n"; @@ -90,6 +80,7 @@ public class FeConstants { public static String FS_PREFIX_HDFS = "hdfs"; public static String FS_PREFIX_VIEWFS = "viewfs"; public static String FS_PREFIX_FILE = "file"; + public static final String INTERNAL_DB_NAME = "__internal_schema"; public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java new file mode 100644 index 00000000000..7d60222d299 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.regex.Pattern; + +public class FileFormatConstants { + public static final String DEFAULT_COLUMN_SEPARATOR = "\t"; + public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001"; + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String FORMAT_CSV = "csv"; + public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names"; + public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; + public static final String FORMAT_HIVE_TEXT = "hive_text"; + public static final String FORMAT_PARQUET = "parquet"; + public static final String FORMAT_ORC = "orc"; + public static final String FORMAT_JSON = "json"; + public static final String FORMAT_AVRO = "avro"; + public static final String FORMAT_WAL = "wal"; + + public static final String PROP_FORMAT = "format"; + public static final String PROP_COLUMN_SEPARATOR = "column_separator"; + public static final String PROP_LINE_DELIMITER = "line_delimiter"; + public static final String PROP_JSON_ROOT = "json_root"; + public static final String PROP_JSON_PATHS = "jsonpaths"; + public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line"; + public static final String PROP_NUM_AS_STRING = "num_as_string"; + public static final String PROP_FUZZY_PARSE = "fuzzy_parse"; + public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; + public static final String PROP_SKIP_LINES = "skip_lines"; + public static final String PROP_CSV_SCHEMA = "csv_schema"; + public static final String PROP_COMPRESS_TYPE = "compress_type"; + public static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys"; + + // decimal(p,s) + public static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); + // datetime(p) + public static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)"); + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java new file mode 100644 index 00000000000..0b646a00b16 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeNameFormat; + +import com.google.common.base.Strings; + +import java.util.List; +import java.util.regex.Matcher; + +public class FileFormatUtils { + + public static boolean isCsv(String formatStr) { + return FileFormatConstants.FORMAT_CSV.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_CSV_WITH_NAMES.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr); + } + + // public for unit test + public static void parseCsvSchema(List<Column> csvSchema, String csvSchemaStr) + throws AnalysisException { + if (Strings.isNullOrEmpty(csvSchemaStr)) { + return; + } + // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)" + String[] schemaStrs = csvSchemaStr.split(";"); + try { + for (String schemaStr : schemaStrs) { + String[] kv = schemaStr.replace(" ", "").split(":"); + if (kv.length != 2) { + throw new AnalysisException("invalid csv schema: " + csvSchemaStr); + } + Column column = null; + String name = kv[0].toLowerCase(); + FeNameFormat.checkColumnName(name); + String type = kv[1].toLowerCase(); + if (type.equals("tinyint")) { + column = new Column(name, PrimitiveType.TINYINT, true); + } else if (type.equals("smallint")) { + column = new Column(name, PrimitiveType.SMALLINT, true); + } else if (type.equals("int")) { + column = new Column(name, PrimitiveType.INT, true); + } else if (type.equals("bigint")) { + column = new Column(name, PrimitiveType.BIGINT, true); + } else if (type.equals("largeint")) { + column = new Column(name, PrimitiveType.LARGEINT, true); + } else if (type.equals("float")) { + column = new Column(name, PrimitiveType.FLOAT, true); + } else if (type.equals("double")) { + column = new Column(name, PrimitiveType.DOUBLE, true); + } else if (type.startsWith("decimal")) { + // regex decimal(p, s) + Matcher matcher = FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type); + if (!matcher.find()) { + throw new AnalysisException("invalid decimal type: " + type); + } + int precision = Integer.parseInt(matcher.group(1)); + int scale = Integer.parseInt(matcher.group(2)); + column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null, + ""); + } else if (type.equals("date")) { + column = new Column(name, ScalarType.createDateType(), false, null, true, null, ""); + } else if (type.startsWith("datetime")) { + int scale = 0; + if (!type.equals("datetime")) { + // regex datetime(s) + Matcher matcher = FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type); + if (!matcher.find()) { + throw new AnalysisException("invalid datetime type: " + type); + } + scale = Integer.parseInt(matcher.group(1)); + } + column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, ""); + } else if (type.equals("string")) { + column = new Column(name, PrimitiveType.STRING, true); + } else if (type.equals("boolean")) { + column = new Column(name, PrimitiveType.BOOLEAN, true); + } else { + throw new AnalysisException("unsupported column type: " + type); + } + csvSchema.add(column); + } + } catch (Exception e) { + throw new AnalysisException("invalid csv schema: " + e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 2cdc3f1972f..a6924f784fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.ConnectContext; @@ -551,17 +550,18 @@ public class Util { public static TFileFormatType getFileFormatTypeFromName(String formatName) { String lowerFileFormat = Objects.requireNonNull(formatName).toLowerCase(); - if (lowerFileFormat.equals("parquet")) { + if (lowerFileFormat.equals(FileFormatConstants.FORMAT_PARQUET)) { return TFileFormatType.FORMAT_PARQUET; - } else if (lowerFileFormat.equals("orc")) { + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ORC)) { return TFileFormatType.FORMAT_ORC; - } else if (lowerFileFormat.equals("json")) { + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_JSON)) { return TFileFormatType.FORMAT_JSON; // csv/csv_with_name/csv_with_names_and_types treat as csv format - } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) - || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV) + || lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES) // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. - || lowerFileFormat.equals(FeConstants.text)) { + || lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) { return TFileFormatType.FORMAT_CSV_PLAIN; } else { return TFileFormatType.FORMAT_UNKNOWN; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 6d7031f61c7..d9213360583 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -20,7 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleId; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; @@ -65,8 +65,8 @@ public class ResultFileSink extends DataSink { public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, ArrayList<String> labels) { this(exchNodeId, outFileClause); - if (outFileClause.getHeaderType().equals(FeConstants.csv_with_names) - || outFileClause.getHeaderType().equals(FeConstants.csv_with_names_and_types)) { + if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter()); } headerType = outFileClause.getHeaderType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 67e09cbf205..7b57d9b0454 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -292,7 +292,7 @@ public abstract class FileQueryScanNode extends FileScanNode { for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - setLocationPropertiesIfNecessary(locationType, fileSplit, locationProperties); + setLocationPropertiesIfNecessary(locationType, locationProperties); TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. @@ -368,7 +368,7 @@ public abstract class FileQueryScanNode extends FileScanNode { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit fileSplit, + private void setLocationPropertiesIfNecessary(TFileType locationType, Map<String, String> locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { if (!params.isSetHdfsParams()) { @@ -455,13 +455,6 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract Map<String, String> getLocationProperties() throws UserException; - // eg: hdfs://namenode s3://buckets - protected String getFsName(FileSplit split) { - String fullPath = split.getPath().toUri().toString(); - String filePath = split.getPath().toUri().getPath(); - return fullPath.replace(filePath, ""); - } - protected static Optional<TFileType> getTFileType(String location) { if (location != null && !location.isEmpty()) { if (S3Util.isObjStorage(location)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index d36958ad88b..52bb119d7a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -27,9 +27,9 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.Util; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; @@ -131,8 +131,8 @@ public class LoadScanProvider { private String getHeaderType(String formatType) { if (formatType != null) { - if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || formatType.equalsIgnoreCase( - FeConstants.csv_with_names_and_types)) { + if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { return formatType; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index c069aa43c30..6cdd9d9834f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -81,10 +81,6 @@ public class TVFScanNode extends FileQueryScanNode { numNodes = backendPolicy.numBackends(); } - protected String getFsName(FileSplit split) { - return tableValuedFunction.getFsName(); - } - @Override public TFileAttributes getFileAttributes() throws UserException { return tableValuedFunction.getFileAttributes(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 9ab79eecaaa..8fddb508466 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -32,12 +32,12 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.external.TVFScanNode; @@ -67,6 +67,7 @@ import org.apache.doris.thrift.TTextSerdeType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,8 +81,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -89,43 +88,22 @@ import java.util.stream.Collectors; */ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf { public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class); - protected static String DEFAULT_COLUMN_SEPARATOR = ","; - protected static String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001"; - protected static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String FORMAT = "format"; - public static final String COLUMN_SEPARATOR = "column_separator"; - public static final String LINE_DELIMITER = "line_delimiter"; - protected static final String JSON_ROOT = "json_root"; - protected static final String JSON_PATHS = "jsonpaths"; - protected static final String STRIP_OUTER_ARRAY = "strip_outer_array"; - protected static final String READ_JSON_BY_LINE = "read_json_by_line"; - protected static final String NUM_AS_STRING = "num_as_string"; - protected static final String FUZZY_PARSE = "fuzzy_parse"; - protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes"; - protected static final String SKIP_LINES = "skip_lines"; - protected static final String CSV_SCHEMA = "csv_schema"; - protected static final String COMPRESS_TYPE = "compress_type"; - public static final String PATH_PARTITION_KEYS = "path_partition_keys"; - // decimal(p,s) - private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); - // datetime(p) - private static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)"); protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder<String>() - .add(FORMAT) - .add(JSON_ROOT) - .add(JSON_PATHS) - .add(STRIP_OUTER_ARRAY) - .add(READ_JSON_BY_LINE) - .add(NUM_AS_STRING) - .add(FUZZY_PARSE) - .add(COLUMN_SEPARATOR) - .add(LINE_DELIMITER) - .add(TRIM_DOUBLE_QUOTES) - .add(SKIP_LINES) - .add(CSV_SCHEMA) - .add(COMPRESS_TYPE) - .add(PATH_PARTITION_KEYS) + .add(FileFormatConstants.PROP_FORMAT) + .add(FileFormatConstants.PROP_JSON_ROOT) + .add(FileFormatConstants.PROP_JSON_PATHS) + .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY) + .add(FileFormatConstants.PROP_READ_JSON_BY_LINE) + .add(FileFormatConstants.PROP_NUM_AS_STRING) + .add(FileFormatConstants.PROP_FUZZY_PARSE) + .add(FileFormatConstants.PROP_COLUMN_SEPARATOR) + .add(FileFormatConstants.PROP_LINE_DELIMITER) + .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES) + .add(FileFormatConstants.PROP_SKIP_LINES) + .add(FileFormatConstants.PROP_CSV_SCHEMA) + .add(FileFormatConstants.PROP_COMPRESS_TYPE) + .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS) .build(); // Columns got from file and path(if has) @@ -137,17 +115,16 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio private List<String> pathPartitionKeys; protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList(); - protected Map<String, String> locationProperties; + protected Map<String, String> locationProperties = Maps.newHashMap(); protected String filePath; - - private TFileFormatType fileFormatType; + protected TFileFormatType fileFormatType; private TFileCompressType compressionType; private String headerType = ""; private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE; - private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; - private String lineDelimiter = DEFAULT_LINE_DELIMITER; + private String columnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; + private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER; private String jsonRoot = ""; private String jsonPaths = ""; private boolean stripOuterArray; @@ -175,20 +152,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return locationProperties; } - public List<Column> getCsvSchema() { - return csvSchema; - } - - public String getFsName() { - TFileType fileType = getTFileType(); - if (fileType == TFileType.FILE_HDFS) { - return locationProperties.get(HdfsResource.HADOOP_FS_NAME); - } else if (fileType == TFileType.FILE_S3) { - return locationProperties.get(S3Properties.ENDPOINT); - } - return ""; - } - public List<String> getPathPartitionKeys() { return pathPartitionKeys; } @@ -203,25 +166,29 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio } } - //The keys in the passed validParams map need to be lowercase. - protected void parseProperties(Map<String, String> validParams) throws AnalysisException { - String formatString = validParams.getOrDefault(FORMAT, ""); - String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR; + //The keys in properties map need to be lowercase. + protected Map<String, String> parseCommonProperties(Map<String, String> properties) throws AnalysisException { + // Copy the properties, because we will remove the key from properties. + Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); + + String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, ""); + String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; switch (formatString) { case "csv": this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "hive_text": this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - defaultColumnSeparator = DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR; + defaultColumnSeparator = FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR; this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE; break; case "csv_with_names": - this.headerType = FeConstants.csv_with_names; + this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "csv_with_names_and_types": - this.headerType = FeConstants.csv_with_names_and_types; + this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "parquet": @@ -240,113 +207,61 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio throw new AnalysisException("format:" + formatString + " is not supported."); } - columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, defaultColumnSeparator); + columnSeparator = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COLUMN_SEPARATOR, + defaultColumnSeparator); if (Strings.isNullOrEmpty(columnSeparator)) { throw new AnalysisException("column_separator can not be empty."); } columnSeparator = Separator.convertSeparator(columnSeparator); - lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, DEFAULT_LINE_DELIMITER); + lineDelimiter = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_LINE_DELIMITER, + FileFormatConstants.DEFAULT_LINE_DELIMITER); if (Strings.isNullOrEmpty(lineDelimiter)) { throw new AnalysisException("line_delimiter can not be empty."); } lineDelimiter = Separator.convertSeparator(lineDelimiter); - jsonRoot = validParams.getOrDefault(JSON_ROOT, ""); - jsonPaths = validParams.getOrDefault(JSON_PATHS, ""); - readJsonByLine = Boolean.valueOf(validParams.get(READ_JSON_BY_LINE)).booleanValue(); - stripOuterArray = Boolean.valueOf(validParams.get(STRIP_OUTER_ARRAY)).booleanValue(); - numAsString = Boolean.valueOf(validParams.get(NUM_AS_STRING)).booleanValue(); - fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue(); - trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue(); - skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue(); - + jsonRoot = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_ROOT, ""); + jsonPaths = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_PATHS, ""); + readJsonByLine = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue(); + stripOuterArray = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue(); + numAsString = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue(); + fuzzyParse = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue(); + trimDoubleQuotes = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue(); + skipLines = Integer.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_SKIP_LINES, "0")).intValue(); + + String compressTypeStr = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN"); try { - compressionType = Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN")); + compressionType = Util.getFileCompressType(compressTypeStr); } catch (IllegalArgumentException e) { - throw new AnalysisException("Compress type : " + validParams.get(COMPRESS_TYPE) + " is not supported."); + throw new AnalysisException("Compress type : " + compressTypeStr + " is not supported."); } - if (formatString.equals("csv") || formatString.equals("csv_with_names") - || formatString.equals("csv_with_names_and_types")) { - parseCsvSchema(csvSchema, validParams); + if (FileFormatUtils.isCsv(formatString)) { + FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps, + FileFormatConstants.PROP_CSV_SCHEMA, "")); + LOG.debug("get csv schema: {}", csvSchema); } - pathPartitionKeys = Optional.ofNullable(validParams.get(PATH_PARTITION_KEYS)) - .map(str -> - Arrays.stream(str.split(",")) - .map(String::trim) - .collect(Collectors.toList())) + + pathPartitionKeys = Optional.ofNullable( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_PATH_PARTITION_KEYS, null)) + .map(str -> Arrays.stream(str.split(",")) + .map(String::trim) + .collect(Collectors.toList())) .orElse(Lists.newArrayList()); + + return copiedProps; } - // public for unit test - public static void parseCsvSchema(List<Column> csvSchema, Map<String, String> validParams) - throws AnalysisException { - String csvSchemaStr = validParams.get(CSV_SCHEMA); - if (Strings.isNullOrEmpty(csvSchemaStr)) { - return; - } - // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)" - String[] schemaStrs = csvSchemaStr.split(";"); - try { - for (String schemaStr : schemaStrs) { - String[] kv = schemaStr.replace(" ", "").split(":"); - if (kv.length != 2) { - throw new AnalysisException("invalid csv schema: " + csvSchemaStr); - } - Column column = null; - String name = kv[0].toLowerCase(); - FeNameFormat.checkColumnName(name); - String type = kv[1].toLowerCase(); - if (type.equals("tinyint")) { - column = new Column(name, PrimitiveType.TINYINT, true); - } else if (type.equals("smallint")) { - column = new Column(name, PrimitiveType.SMALLINT, true); - } else if (type.equals("int")) { - column = new Column(name, PrimitiveType.INT, true); - } else if (type.equals("bigint")) { - column = new Column(name, PrimitiveType.BIGINT, true); - } else if (type.equals("largeint")) { - column = new Column(name, PrimitiveType.LARGEINT, true); - } else if (type.equals("float")) { - column = new Column(name, PrimitiveType.FLOAT, true); - } else if (type.equals("double")) { - column = new Column(name, PrimitiveType.DOUBLE, true); - } else if (type.startsWith("decimal")) { - // regex decimal(p, s) - Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid decimal type: " + type); - } - int precision = Integer.parseInt(matcher.group(1)); - int scale = Integer.parseInt(matcher.group(2)); - column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null, - ""); - } else if (type.equals("date")) { - column = new Column(name, ScalarType.createDateType(), false, null, true, null, ""); - } else if (type.startsWith("datetime")) { - int scale = 0; - if (!type.equals("datetime")) { - // regex datetime(s) - Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid datetime type: " + type); - } - scale = Integer.parseInt(matcher.group(1)); - } - column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, ""); - } else if (type.equals("string")) { - column = new Column(name, PrimitiveType.STRING, true); - } else if (type.equals("boolean")) { - column = new Column(name, PrimitiveType.BOOLEAN, true); - } else { - throw new AnalysisException("unsupported column type: " + type); - } - csvSchema.add(column); - } - LOG.debug("get csv schema: {}", csvSchema); - } catch (Exception e) { - throw new AnalysisException("invalid csv schema: " + e.getMessage()); - } + protected String getOrDefaultAndRemove(Map<String, String> props, String key, String defaultValue) { + String value = props.getOrDefault(key, defaultValue); + props.remove(key); + return value; } public List<TBrokerFileStatus> getFileStatuses() { @@ -541,3 +456,4 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java index eb8e8f70f7b..051706ae474 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java @@ -18,16 +18,14 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.ExportStmt; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.URI; import org.apache.doris.thrift.TFileType; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.apache.commons.collections.map.CaseInsensitiveMap; +import com.google.common.base.Strings; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -39,50 +37,41 @@ import java.util.Map; */ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction { public static final Logger LOG = LogManager.getLogger(HdfsTableValuedFunction.class); - public static final String NAME = "hdfs"; - public static final String HDFS_URI = "uri"; - // simple or kerberos + private static final String PROP_URI = "uri"; + + public HdfsTableValuedFunction(Map<String, String> properties) throws AnalysisException { + init(properties); + } - private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>() - .add(HDFS_URI) - .add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) - .add(HdfsResource.HADOOP_FS_NAME) - .add(HdfsResource.HADOOP_USER_NAME) - .add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL) - .add(HdfsResource.HADOOP_KERBEROS_KEYTAB) - .add(HdfsResource.HADOOP_SHORT_CIRCUIT) - .add(HdfsResource.HADOOP_SOCKET_PATH) - .build(); + private void init(Map<String, String> properties) throws AnalysisException { + // 1. analyze common properties + Map<String, String> otherProps = super.parseCommonProperties(properties); - private URI hdfsUri; + // 2. analyze uri + String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); + if (Strings.isNullOrEmpty(uriStr)) { + throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); + } + URI uri = URI.create(uriStr); + StorageBackend.checkUri(uri, StorageType.HDFS); + filePath = uri.getScheme() + "://" + uri.getAuthority() + uri.getPath(); - public HdfsTableValuedFunction(Map<String, String> params) throws AnalysisException { - Map<String, String> fileParams = new CaseInsensitiveMap(); - locationProperties = Maps.newHashMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - fileParams.put(lowerKey, params.get(key)); - } else if (LOCATION_PROPERTIES.contains(lowerKey)) { - locationProperties.put(lowerKey, params.get(key)); - } else if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) { + // 3. analyze other properties + for (String key : otherProps.keySet()) { + if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) { // because HADOOP_FS_NAME contains upper and lower case - locationProperties.put(HdfsResource.HADOOP_FS_NAME, params.get(key)); + locationProperties.put(HdfsResource.HADOOP_FS_NAME, otherProps.get(key)); } else { - locationProperties.put(key, params.get(key)); + locationProperties.put(key, otherProps.get(key)); } } - - if (!locationProperties.containsKey(HDFS_URI)) { - throw new AnalysisException(String.format("Configuration '%s' is required.", HDFS_URI)); + // If the user does not specify the HADOOP_FS_NAME, we will use the uri's scheme and authority + if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) { + locationProperties.put(HdfsResource.HADOOP_FS_NAME, uri.getScheme() + "://" + uri.getAuthority()); } - ExportStmt.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS); - hdfsUri = URI.create(locationProperties.get(HDFS_URI)); - filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + hdfsUri.getPath(); - - super.parseProperties(fileParams); + // 4. parse file parseFile(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java index 265045d7a6f..5044f045c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java @@ -20,9 +20,9 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; -import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,24 +36,15 @@ public class HttpStreamTableValuedFunction extends ExternalFileTableValuedFuncti private static final Logger LOG = LogManager.getLogger(HttpStreamTableValuedFunction.class); public static final String NAME = "http_stream"; - public HttpStreamTableValuedFunction(Map<String, String> params) throws AnalysisException { - Map<String, String> fileParams = new CaseInsensitiveMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (!FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - throw new AnalysisException(key + " is invalid property"); - } - fileParams.put(lowerKey, params.get(key).toLowerCase()); - } + public HttpStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + // 1. analyze common properties + super.parseCommonProperties(properties); - String formatString = fileParams.getOrDefault(FORMAT, "").toLowerCase(); - if (formatString.equals("parquet") - || formatString.equals("avro") - || formatString.equals("orc")) { - throw new AnalysisException("current http_stream does not yet support parquet, avro and orc"); + if (fileFormatType == TFileFormatType.FORMAT_PARQUET + || fileFormatType == TFileFormatType.FORMAT_AVRO + || fileFormatType == TFileFormatType.FORMAT_ORC) { + throw new AnalysisException("http_stream does not yet support parquet, avro and orc"); } - - super.parseProperties(fileParams); } // =========== implement abstract methods of ExternalFileTableValuedFunction ================= diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index 129c3f930c7..350621e3550 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -31,8 +31,6 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,42 +44,31 @@ import java.util.concurrent.TimeUnit; */ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction { private static final Logger LOG = LogManager.getLogger(LocalTableValuedFunction.class); - public static final String NAME = "local"; - public static final String FILE_PATH = "file_path"; - public static final String BACKEND_ID = "backend_id"; + public static final String PROP_FILE_PATH = "file_path"; + public static final String PROP_BACKEND_ID = "backend_id"; private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>() - .add(FILE_PATH) - .add(BACKEND_ID) + .add(PROP_FILE_PATH) + .add(PROP_BACKEND_ID) .build(); private long backendId; - public LocalTableValuedFunction(Map<String, String> params) throws AnalysisException { - Map<String, String> fileParams = new CaseInsensitiveMap(); - locationProperties = Maps.newHashMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - fileParams.put(lowerKey, params.get(key)); - } else if (LOCATION_PROPERTIES.contains(lowerKey)) { - locationProperties.put(lowerKey, params.get(key)); - } else { - throw new AnalysisException(key + " is invalid property"); - } - } + public LocalTableValuedFunction(Map<String, String> properties) throws AnalysisException { + // 1. analyze common properties + Map<String, String> otherProps = super.parseCommonProperties(properties); + // 2. analyze location properties for (String key : LOCATION_PROPERTIES) { - if (!locationProperties.containsKey(key)) { - throw new AnalysisException(String.format("Configuration '%s' is required.", key)); + if (!otherProps.containsKey(key)) { + throw new AnalysisException(String.format("Property '%s' is required.", key)); } } + filePath = otherProps.get(PROP_FILE_PATH); + backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID)); - filePath = locationProperties.get(FILE_PATH); - backendId = Long.parseLong(locationProperties.get(BACKEND_ID)); - super.parseProperties(fileParams); - + // 3. parse file getFileListFromBackend(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 504730daaee..9ad6232c4e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -30,10 +30,9 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.thrift.TFileType; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import java.util.HashMap; import java.util.Map; /** @@ -49,71 +48,46 @@ import java.util.Map; */ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { public static final String NAME = "s3"; - public static final String S3_URI = "uri"; + public static final String PROP_URI = "uri"; private static final ImmutableSet<String> DEPRECATED_KEYS = - ImmutableSet.of("access_key", "secret_key", "session_token", "region"); - - private static final ImmutableSet<String> OPTIONAL_KEYS = - ImmutableSet.of(S3Properties.SESSION_TOKEN, PropertyConverter.USE_PATH_STYLE, S3Properties.REGION, - PATH_PARTITION_KEYS); - - private static final ImmutableSet<String> LOCATION_PROPERTIES = ImmutableSet.<String>builder() - .add(S3_URI) - .add(S3Properties.ENDPOINT) - .addAll(DEPRECATED_KEYS) - .addAll(S3Properties.TVF_REQUIRED_FIELDS) - .addAll(OPTIONAL_KEYS) - .build(); - - private final S3URI s3uri; - private final boolean forceVirtualHosted; - private String virtualBucket = ""; + ImmutableSet.of("access_key", "secret_key", "session_token", "region", + "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION"); - public S3TableValuedFunction(Map<String, String> params) throws AnalysisException { + private String virtualBucket = ""; - Map<String, String> fileParams = new HashMap<>(); - for (Map.Entry<String, String> entry : params.entrySet()) { - String key = entry.getKey(); - String lowerKey = key.toLowerCase(); - if (!LOCATION_PROPERTIES.contains(lowerKey) && !FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - throw new AnalysisException("Invalid property: " + key); - } - if (DEPRECATED_KEYS.contains(lowerKey)) { - lowerKey = S3Properties.S3_PREFIX + lowerKey; - } - fileParams.put(lowerKey, entry.getValue()); - } + public S3TableValuedFunction(Map<String, String> properties) throws AnalysisException { + // 1. analyze common properties + Map<String, String> otherProps = super.parseCommonProperties(properties); - if (!fileParams.containsKey(S3_URI)) { - throw new AnalysisException("Missing required property: " + S3_URI); + // 2. analyze uri and other properties + String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); + if (Strings.isNullOrEmpty(uriStr)) { + throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); } - - forceVirtualHosted = isVirtualHosted(fileParams); - s3uri = getS3Uri(fileParams); - final String endpoint = forceVirtualHosted - ? getEndpointAndSetVirtualBucket(params) - : s3uri.getBucketScheme(); - if (!fileParams.containsKey(S3Properties.REGION)) { + forwardCompatibleDeprecatedKeys(otherProps); + + String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, "false"); + boolean forceVirtualHosted = isVirtualHosted(uriStr, Boolean.parseBoolean(usePathStyle)); + S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted); + String endpoint = forceVirtualHosted + ? getEndpointAndSetVirtualBucket(s3uri, otherProps) : s3uri.getBucketScheme(); + if (!otherProps.containsKey(S3Properties.REGION)) { String region = S3Properties.getRegionOfEndpoint(endpoint); - fileParams.put(S3Properties.REGION, region); + otherProps.put(S3Properties.REGION, region); } + checkNecessaryS3Properties(otherProps); CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint, - fileParams.get(S3Properties.REGION), - fileParams.get(S3Properties.ACCESS_KEY), - fileParams.get(S3Properties.SECRET_KEY)); - if (fileParams.containsKey(S3Properties.SESSION_TOKEN)) { - credential.setSessionToken(fileParams.get(S3Properties.SESSION_TOKEN)); + otherProps.get(S3Properties.REGION), + otherProps.get(S3Properties.ACCESS_KEY), + otherProps.get(S3Properties.SECRET_KEY)); + if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) { + credential.setSessionToken(otherProps.get(S3Properties.SESSION_TOKEN)); } - // set S3 location properties - // these five properties is necessary, no one can be lost. locationProperties = S3Properties.credentialToMap(credential); - String usePathStyle = fileParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"); locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); - this.locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(this.locationProperties)); - - super.parseProperties(fileParams); + locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties)); if (forceVirtualHosted) { filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM @@ -130,39 +104,59 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { } } - private String getEndpointAndSetVirtualBucket(Map<String, String> params) throws AnalysisException { - Preconditions.checkState(forceVirtualHosted, "only invoked when force virtual hosted."); - String[] fileds = s3uri.getVirtualBucket().split("\\.", 2); - virtualBucket = fileds[0]; - if (fileds.length > 1) { + private void forwardCompatibleDeprecatedKeys(Map<String, String> props) { + for (String deprecatedKey : DEPRECATED_KEYS) { + String value = props.remove(deprecatedKey); + if (!Strings.isNullOrEmpty(value)) { + props.put("s3." + deprecatedKey.toLowerCase(), value); + } + } + } + + private void checkNecessaryS3Properties(Map<String, String> props) throws AnalysisException { + if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION)); + } + if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY)); + } + if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.SECRET_KEY)); + } + } + + private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map<String, String> props) + throws AnalysisException { + String[] fields = s3uri.getVirtualBucket().split("\\.", 2); + virtualBucket = fields[0]; + if (fields.length > 1) { // At this point, s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg: // uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt // s3uri.getVirtualBucket() = my_bucket.cos.ap-beijing.myqcloud.com, // so we need separate virtualBucket and endpoint. - return fileds[1]; - } else if (params.containsKey(S3Properties.ENDPOINT)) { - return params.get(S3Properties.ENDPOINT); + return fields[1]; + } else if (props.containsKey(S3Properties.ENDPOINT)) { + return props.get(S3Properties.ENDPOINT); } else { throw new AnalysisException("can not parse endpoint, please check uri."); } } - private boolean isVirtualHosted(Map<String, String> validParams) { - String originUri = validParams.getOrDefault(S3_URI, ""); - if (originUri.toLowerCase().startsWith("s3")) { + private boolean isVirtualHosted(String uri, boolean usePathStyle) { + if (uri.toLowerCase().startsWith("s3")) { // s3 protocol, default virtual-hosted style return true; } else { // not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE. - return !Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE)); + return !usePathStyle; } } - private S3URI getS3Uri(Map<String, String> validParams) throws AnalysisException { + private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws AnalysisException { try { - return S3URI.create(validParams.get(S3_URI), forceVirtualHosted); + return S3URI.create(uri, forceVirtualHosted); } catch (UserException e) { - throw new AnalysisException("parse s3 uri failed, uri = " + validParams.get(S3_URI), e); + throw new AnalysisException("parse s3 uri failed, uri = " + uri, e); } } @@ -189,3 +183,4 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { return "S3TableValuedFunction"; } } + diff --git a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java index f664415e6d5..e5b06bd5dd4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java @@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.FileFormatUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -35,12 +37,12 @@ public class ExternalFileTableValuedFunctionTest { public void testCsvSchemaParse() { Config.enable_date_conversion = true; Map<String, String> properties = Maps.newHashMap(); - properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, + properties.put(FileFormatConstants.PROP_CSV_SCHEMA, "k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:bool;" + "k8:char(10);k9:varchar(20);k10:date;k11:datetime;k12:decimal(10,2)"); List<Column> csvSchema = Lists.newArrayList(); try { - ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties); + FileFormatUtils.parseCsvSchema(csvSchema, properties.get(FileFormatConstants.PROP_CSV_SCHEMA)); Assert.fail(); } catch (AnalysisException e) { e.printStackTrace(); @@ -48,11 +50,11 @@ public class ExternalFileTableValuedFunctionTest { } csvSchema.clear(); - properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, + properties.put(FileFormatConstants.PROP_CSV_SCHEMA, "k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:boolean;" + "k8:string;k9:date;k10:datetime;k11:decimal(10, 2);k12:decimal( 38,10); k13:datetime(5)"); try { - ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties); + FileFormatUtils.parseCsvSchema(csvSchema, properties.get(FileFormatConstants.PROP_CSV_SCHEMA)); Assert.assertEquals(13, csvSchema.size()); Column decimalCol = csvSchema.get(10); Assert.assertEquals(10, decimalCol.getPrecision()); diff --git a/regression-test/suites/export_p2/test_export_max_file_size.groovy b/regression-test/suites/export_p2/test_export_max_file_size.groovy index 0efe3a82cfa..460bc130268 100644 --- a/regression-test/suites/export_p2/test_export_max_file_size.groovy +++ b/regression-test/suites/export_p2/test_export_max_file_size.groovy @@ -73,8 +73,8 @@ suite("test_export_max_file_size", "p2") { insert into ${table_export_name} select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "csv"); """ @@ -126,8 +126,8 @@ suite("test_export_max_file_size", "p2") { insert into ${table_load_name} select * from hdfs( "uri" = "${outfile_url}${j}.csv", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "csv"); """ } diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy b/regression-test/suites/export_p2/test_export_with_hdfs.groovy index 205b1ffd716..d108d355e29 100644 --- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy +++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy @@ -97,8 +97,8 @@ suite("test_export_with_hdfs", "p2") { // check data correctness order_qt_select """ select * from hdfs( "uri" = "${outfile_url}0.${file_suffix}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "${format}"); """ } diff --git a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy index 3f9abe2c2b2..42a2354dd95 100644 --- a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy +++ b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy @@ -58,7 +58,6 @@ suite("test_outfile_orc_max_file_size", "p2") { insert into ${table_export_name} select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", "format" = "orc"); """ diff --git a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy index beb3cd3e0cf..b9005037a5c 100644 --- a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy +++ b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy @@ -35,7 +35,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res1_2.toString()) def res1_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res1_3.toString()) } @@ -58,7 +58,7 @@ suite("test_different_parquet_types", "p0") { //return nothing,but no exception def res3_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res3_3.toString()) } @@ -76,7 +76,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res4_2.toString()) def res4_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res4_3.toString()) } @@ -95,7 +95,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res5_2.toString()) def res5_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res5_3.toString()) } @@ -114,7 +114,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res6_2.toString()) def res6_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res6_3.toString()) @@ -133,7 +133,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res7_2.toString()) def res7_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res7_3.toString()) } @@ -152,7 +152,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res8_2.toString()) def res8_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res8_3.toString()) } @@ -170,7 +170,7 @@ suite("test_different_parquet_types", "p0") { logger.info("record res" + res9_2.toString()) def res9_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"fs.defaultFS\" = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res9_3.toString()) } diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy index 0535eb6505a..a4b9bdd71c8 100644 --- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy @@ -32,8 +32,8 @@ suite("test_hdfs_tvf") { format = "csv" qt_csv_all_types """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by c1; """ @@ -41,15 +41,14 @@ suite("test_hdfs_tvf") { format = "csv" qt_csv_student """ select cast(c1 as INT) as id, c2 as name, c3 as age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_malformat.csv" format = "csv" qt_csv_array_malformat """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "column_separator" = "|") order by c1; """ @@ -57,7 +56,6 @@ suite("test_hdfs_tvf") { uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_normal.csv" format = "csv" qt_csv_array_normal """ select * from HDFS("uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "column_separator" = "|") order by c1; """ @@ -67,9 +65,9 @@ suite("test_hdfs_tvf") { format = "csv" qt_csv_with_compress_type """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", + "column_separator" = ",", "compress_type" = "GZ") order by c1; """ // test csv format infer compress type @@ -77,8 +75,8 @@ suite("test_hdfs_tvf") { format = "csv" qt_csv_infer_compress_type """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by c1; """ // test csv_with_names file format @@ -86,8 +84,8 @@ suite("test_hdfs_tvf") { format = "csv_with_names" qt_csv_names """ select cast(id as INT) as id, name, age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ // test csv_with_names_and_types file format @@ -95,8 +93,8 @@ suite("test_hdfs_tvf") { format = "csv_with_names_and_types" qt_csv_names_types """ select cast(id as INT) as id, name, age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ @@ -105,7 +103,6 @@ suite("test_hdfs_tvf") { format = "parquet" qt_parquet """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}") order by s_suppkey limit 20; """ @@ -114,7 +111,6 @@ suite("test_hdfs_tvf") { format = "orc" qt_orc """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}") order by p_partkey limit 20; """ @@ -124,7 +120,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -135,7 +130,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_limit1 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -145,7 +139,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_limit2 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "true", @@ -154,7 +147,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_limit3 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -163,7 +155,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_limit4 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -175,7 +166,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_root """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -187,7 +177,6 @@ suite("test_hdfs_tvf") { format = "json" qt_json_paths """ select cast(id as INT) as id, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -199,7 +188,6 @@ suite("test_hdfs_tvf") { format = "json" qt_one_array """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "true", @@ -211,7 +199,6 @@ suite("test_hdfs_tvf") { format = "json" qt_cast """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -240,7 +227,6 @@ suite("test_hdfs_tvf") { select cast (id as INT) as id, city, cast (code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -256,7 +242,6 @@ suite("test_hdfs_tvf") { format = "parquet" qt_desc """ desc function HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}"); """ } finally { diff --git a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy index 40dc3c24405..d71e07487ca 100644 --- a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy @@ -30,7 +30,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem qt_gz_1 """ select ${select_field} from HDFS( "uri" = "${baseUri}/dt=gzip/000000_0.gz", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -40,7 +39,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem qt_gz_2 """ desc function HDFS( "uri" = "${baseUri}/dt=gzip/000000_0.gz", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -52,7 +50,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=bzip2/000000_0.bz2", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -64,7 +61,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -75,7 +71,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select c7 from HDFS( "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -88,7 +83,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=plain/000000_0", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -99,7 +93,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select c3,c4,c10 from HDFS( "uri" = "${baseUri}/dt=plain/000000_0", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -114,7 +107,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet", - "fs.defaultFS" = "${baseFs}", "format" = "parquet" ); """ @@ -124,7 +116,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet", - "fs.defaultFS" = "${baseFs}", "format" = "parquet" ); """ @@ -135,7 +126,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc", - "fs.defaultFS" = "${baseFs}", "format" = "orc" ); """ @@ -145,7 +135,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc", - "fs.defaultFS" = "${baseFs}", "format" = "orc" ); """ @@ -156,7 +145,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt", - "fs.defaultFS" = "${baseFs}", "format" = "csv" ); """ @@ -166,7 +154,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt", - "fs.defaultFS" = "${baseFs}", "format" = "csv" ); """ diff --git a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy index ad572936aec..dcd98af2f9d 100644 --- a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy @@ -27,7 +27,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_1 """ select * from HDFS( "uri" = "${baseUri}/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1" ) order by c1,c2 ; @@ -36,7 +35,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_2 """ select * from HDFS( "uri" = "${baseUri}/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") where dt1!="cyw" order by c1,c2 limit 3; @@ -45,7 +43,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_3 """ select dt1,c1,count(*) from HDFS( "uri" = "${baseUri}/dt1=hello/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") group by c1,dt1 order by c1; @@ -54,7 +51,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_4 """ select * from HDFS( "uri" = "${baseUri}/dt2=two/dt1=hello/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") order by c1; @@ -63,7 +59,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_5 """ select * from HDFS( "uri" = "${baseUri}/dt2=two/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt2,dt1"); diff --git a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy index 57cfdb136d0..279fcb5e8a5 100644 --- a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy @@ -34,6 +34,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 20; """ @@ -47,6 +48,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by cast(c1 as int),c4 limit 20; """ @@ -62,6 +64,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 15; """ @@ -75,6 +78,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") where c1!="100" order by cast(c4 as date),c1 limit 13; """ @@ -90,6 +94,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}FRAME") order by c1,c2,c3,c4,c5 limit 14; """ @@ -103,6 +108,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}FRAME") where c3="buHDwfGeNHfpRFdNaogneddi" order by c3,c1 limit 14; """ diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy index 202bc6b9148..08776efd8ec 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy @@ -23,43 +23,41 @@ suite("test_tvf_p2", "p2") { qt_eof_check """select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/bad_store_sales.parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}", "format" = "parquet") where ss_store_sk = 4 and ss_addr_sk is null order by ss_item_sk""" // array_ancestor_null.parquet is parquet file whose values in the array column are all nulls in a page qt_array_ancestor_null """select count(list_double_col) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/array_ancestor_null.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // all_nested_types.parquet is parquet file that contains all complext types qt_nested_types_parquet """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/all_nested_types.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // all_nested_types.orc is orc file that contains all complext types qt_nested_types_orc """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc", - "format" = "orc", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "orc"); + """ // a row of complex type may be stored across more pages qt_row_cross_pages """select count(id), count(m1), count(m2) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // viewfs qt_viewfs """select count(id), count(m1), count(m2) from hdfs( "uri" = "viewfs://my-cluster/ns1/catalog/tvf/parquet/row_cross_pages.parquet", "format" = "parquet", - "fs.defaultFS" = "viewfs://my-cluster", "fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/", "fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")""" } diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy index 6510571427e..32b9bac9c70 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy @@ -26,8 +26,7 @@ suite("test_tvf_view_count_p2", "p2,external,tvf,external_remote,external_remote sql """use test_tvf_view_count_p2""" sql """set enable_nereids_planner=false""" sql """create view tvf_view_count as select * from hdfs ( - "uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", - "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}", + "uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", "hadoop.username" = "hadoop", "format"="parquet");""" diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy index 2323fcaff8a..8939154bb53 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy @@ -26,8 +26,7 @@ suite("test_tvf_view_p2", "p2,external,tvf,external_remote,external_remote_tvf") sql """use test_tvf_view_p2""" sql """set enable_fallback_to_original_planner=false""" sql """create view tvf_view as select * from hdfs ( - "uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", - "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}", + "uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", "hadoop.username" = "hadoop", "format"="parquet");""" @@ -48,8 +47,7 @@ suite("test_tvf_view_p2", "p2,external,tvf,external_remote,external_remote_tvf") } explain{ sql("select * from hdfs (\n" + - " \"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n" + - " \"fs.defaultFS\"=\"hdfs://${nameNodeHost}:${hdfsPort}\",\n" + + " \"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n" + " \"hadoop.username\" = \"hadoop\",\n" + " \"format\"=\"parquet\")") contains("_table_valued_function_hdfs.p_partkey") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org