This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 36eeb107125 [opt](tvf) refine the class of 
ExternalFileTableValuedFunction (#24706) (#25384)
36eeb107125 is described below

commit 36eeb1071250eac6074e9dd423b0a5ad3e69548f
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Oct 17 14:29:02 2023 +0800

    [opt](tvf) refine the class of ExternalFileTableValuedFunction (#24706) 
(#25384)
    
    backport #24706
---
 .../org/apache/doris/analysis/DataDescription.java |  16 +-
 .../org/apache/doris/analysis/OutFileClause.java   |   6 +-
 .../org/apache/doris/analysis/S3TvfLoadStmt.java   |  11 +-
 .../org/apache/doris/analysis/StorageBackend.java  |  42 ++++
 .../java/org/apache/doris/common/FeConstants.java  |  11 +-
 .../doris/common/util/FileFormatConstants.java     |  57 ++++++
 .../apache/doris/common/util/FileFormatUtils.java  | 108 ++++++++++
 .../java/org/apache/doris/common/util/Util.java    |  14 +-
 .../org/apache/doris/planner/ResultFileSink.java   |   6 +-
 .../doris/planner/external/FileQueryScanNode.java  |  11 +-
 .../doris/planner/external/LoadScanProvider.java   |   6 +-
 .../apache/doris/planner/external/TVFScanNode.java |   4 -
 .../ExternalFileTableValuedFunction.java           | 226 +++++++--------------
 .../tablefunction/HdfsTableValuedFunction.java     |  65 +++---
 .../HttpStreamTableValuedFunction.java             |  25 +--
 .../tablefunction/LocalTableValuedFunction.java    |  39 ++--
 .../doris/tablefunction/S3TableValuedFunction.java | 133 ++++++------
 .../ExternalFileTableValuedFunctionTest.java       |  10 +-
 .../export_p2/test_export_max_file_size.groovy     |   4 +-
 .../suites/export_p2/test_export_with_hdfs.groovy  |   2 +-
 .../test_outfile_orc_max_file_size.groovy          |   1 -
 .../hive/test_different_parquet_types.groovy       |  16 +-
 .../external_table_p0/tvf/test_hdfs_tvf.groovy     |  27 +--
 .../tvf/test_hdfs_tvf_compression.groovy           |  13 --
 .../tvf/test_path_partition_keys.groovy            |   5 -
 .../tvf/test_s3_tvf_compression.groovy             |   6 +
 .../external_table_p2/tvf/test_tvf_p2.groovy       |  18 +-
 .../tvf/test_tvf_view_count_p2.groovy              |   3 +-
 .../external_table_p2/tvf/test_tvf_view_p2.groovy  |   6 +-
 29 files changed, 463 insertions(+), 428 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index fc867847f18..618c80df5c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -28,8 +28,8 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -1110,13 +1110,13 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         // file format
         // note(tsy): for historical reason, file format here must be string 
type rather than TFileFormatType
         if (fileFormat != null) {
-            if (!fileFormat.equalsIgnoreCase("parquet")
-                    && !fileFormat.equalsIgnoreCase(FeConstants.csv)
-                    && !fileFormat.equalsIgnoreCase("orc")
-                    && !fileFormat.equalsIgnoreCase("json")
-                    && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
-                    && 
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
-                    && !fileFormat.equalsIgnoreCase("hive_text")) {
+            if 
(!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON)
+                    && 
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) {
                 throw new AnalysisException("File Format Type " + fileFormat + 
" is invalid.");
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 742e1636fdc..7d5c4433564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -23,10 +23,10 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.Util;
@@ -242,11 +242,11 @@ public class OutFileClause {
                 fileFormatType = TFileFormatType.FORMAT_ORC;
                 break;
             case "csv_with_names":
-                headerType = FeConstants.csv_with_names;
+                headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
                 fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
                 break;
             case "csv_with_names_and_types":
-                headerType = FeConstants.csv_with_names_and_types;
+                headerType = 
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
                 fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
                 break;
             default:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
index a6f864a8c06..ac83f26d49b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
@@ -24,9 +24,9 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.datasource.property.constants.S3Properties.Env;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
 import org.apache.doris.tablefunction.S3TableValuedFunction;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -145,7 +145,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
         final List<String> filePaths = dataDescription.getFilePaths();
         Preconditions.checkState(filePaths.size() == 1, "there should be only 
one file path");
         final String s3FilePath = filePaths.get(0);
-        params.put(S3TableValuedFunction.S3_URI, s3FilePath);
+        params.put(S3TableValuedFunction.PROP_URI, s3FilePath);
 
         final Map<String, String> dataDescProp = 
dataDescription.getProperties();
         if (dataDescProp != null) {
@@ -153,7 +153,8 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
         }
 
         final String format = 
Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
-        params.put(ExternalFileTableValuedFunction.FORMAT, format);
+        params.put(FileFormatConstants.PROP_FORMAT, format);
+
         if (isCsvFormat(format)) {
             parseSeparator(dataDescription.getColumnSeparatorObj(), params);
             parseSeparator(dataDescription.getLineDelimiterObj(), params);
@@ -161,7 +162,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
 
         List<String> columnsFromPath = dataDescription.getColumnsFromPath();
         if (columnsFromPath != null) {
-            params.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
+            params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS,
                     String.join(",", columnsFromPath));
         }
 
@@ -189,7 +190,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
         } catch (AnalysisException e) {
             throw new DdlException(String.format("failed to parse 
separator:%s", separator), e);
         }
-        tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, 
separator.getSeparator());
+        tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, 
separator.getSeparator());
     }
 
     private static boolean isCsvFormat(String format) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 5d6c33c45e7..5d069ea4b23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.common.util.URI;
 import org.apache.doris.datasource.property.constants.BosProperties;
 import org.apache.doris.thrift.TStorageBackendType;
 
@@ -34,6 +35,47 @@ public class StorageBackend implements ParseNode {
     private String location;
     private StorageDesc storageDesc;
 
+    public static void checkPath(String path, StorageBackend.StorageType type) 
throws AnalysisException {
+        if (Strings.isNullOrEmpty(path)) {
+            throw new AnalysisException("No destination path specified.");
+        }
+        checkUri(URI.create(path), type);
+    }
+
+    public static void checkUri(URI uri, StorageBackend.StorageType type) 
throws AnalysisException {
+        String schema = uri.getScheme();
+        if (schema == null) {
+            throw new AnalysisException(
+                    "Invalid export path, there is no schema of URI found. 
please check your path.");
+        }
+        if (type == StorageBackend.StorageType.BROKER) {
+            if (!schema.equalsIgnoreCase("bos")
+                    && !schema.equalsIgnoreCase("afs")
+                    && !schema.equalsIgnoreCase("hdfs")
+                    && !schema.equalsIgnoreCase("viewfs")
+                    && !schema.equalsIgnoreCase("ofs")
+                    && !schema.equalsIgnoreCase("obs")
+                    && !schema.equalsIgnoreCase("oss")
+                    && !schema.equalsIgnoreCase("s3a")
+                    && !schema.equalsIgnoreCase("cosn")
+                    && !schema.equalsIgnoreCase("gfs")
+                    && !schema.equalsIgnoreCase("jfs")
+                    && !schema.equalsIgnoreCase("gs")) {
+                throw new AnalysisException("Invalid broker path. please use 
valid 'hdfs://', 'viewfs://', 'afs://',"
+                        + " 'bos://', 'ofs://', 'obs://', 'oss://', 's3a://', 
'cosn://', 'gfs://', 'gs://'"
+                        + " or 'jfs://' path.");
+            }
+        } else if (type == StorageBackend.StorageType.S3 && 
!schema.equalsIgnoreCase("s3")) {
+            throw new AnalysisException("Invalid export path. please use valid 
's3://' path.");
+        } else if (type == StorageBackend.StorageType.HDFS && 
!schema.equalsIgnoreCase("hdfs")
+                && !schema.equalsIgnoreCase("viewfs")) {
+            throw new AnalysisException("Invalid export path. please use valid 
'HDFS://' or 'viewfs://' path.");
+        } else if (type == StorageBackend.StorageType.LOCAL && 
!schema.equalsIgnoreCase("file")) {
+            throw new AnalysisException(
+                    "Invalid export path. please use valid '" + 
OutFileClause.LOCAL_FILE_PREFIX + "' path.");
+        }
+    }
+
     public StorageBackend(String storageName, String location,
             StorageType storageType, Map<String, String> properties) {
         this.storageDesc = new StorageDesc(storageName, storageType, 
properties);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 0845d593e24..487d50283d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -20,9 +20,6 @@ package org.apache.doris.common;
 import org.apache.doris.persist.meta.FeMetaFormat;
 
 public class FeConstants {
-    // Database and table's default configurations, we will never change them
-    public static short default_replication_num = 3;
-
     // The default value of bucket setting && auto bucket without 
estimate_partition_size
     public static int default_bucket_num = 10;
 
@@ -39,7 +36,6 @@ public class FeConstants {
 
     public static int heartbeat_interval_second = 5;
     public static int checkpoint_interval_second = 60; // 1 minutes
-    public static int ip_check_interval_second = 5;
 
     // dpp version
     public static String dpp_version = "3_2_0";
@@ -69,12 +65,6 @@ public class FeConstants {
     public static long tablet_checker_interval_ms = 20 * 1000L;
     public static long tablet_schedule_interval_ms = 1000L;
 
-    public static String csv = "csv";
-    public static String csv_with_names = "csv_with_names";
-    public static String csv_with_names_and_types = "csv_with_names_and_types";
-
-    public static String text = "hive_text";
-
     public static String FS_PREFIX_S3 = "s3";
     public static String FS_PREFIX_S3A = "s3a";
     public static String FS_PREFIX_S3N = "s3n";
@@ -90,6 +80,7 @@ public class FeConstants {
     public static String FS_PREFIX_HDFS = "hdfs";
     public static String FS_PREFIX_VIEWFS = "viewfs";
     public static String FS_PREFIX_FILE = "file";
+
     public static final String INTERNAL_DB_NAME = "__internal_schema";
     public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = 
"internal_tmp_materialized_view_";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
new file mode 100644
index 00000000000..7d60222d299
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.util.regex.Pattern;
+
+public class FileFormatConstants {
+    public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
+    public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+    public static final String FORMAT_CSV = "csv";
+    public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
+    public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = 
"csv_with_names_and_types";
+    public static final String FORMAT_HIVE_TEXT = "hive_text";
+    public static final String FORMAT_PARQUET = "parquet";
+    public static final String FORMAT_ORC = "orc";
+    public static final String FORMAT_JSON = "json";
+    public static final String FORMAT_AVRO = "avro";
+    public static final String FORMAT_WAL = "wal";
+
+    public static final String PROP_FORMAT = "format";
+    public static final String PROP_COLUMN_SEPARATOR = "column_separator";
+    public static final String PROP_LINE_DELIMITER = "line_delimiter";
+    public static final String PROP_JSON_ROOT = "json_root";
+    public static final String PROP_JSON_PATHS = "jsonpaths";
+    public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array";
+    public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
+    public static final String PROP_NUM_AS_STRING = "num_as_string";
+    public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
+    public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+    public static final String PROP_SKIP_LINES = "skip_lines";
+    public static final String PROP_CSV_SCHEMA = "csv_schema";
+    public static final String PROP_COMPRESS_TYPE = "compress_type";
+    public static final String PROP_PATH_PARTITION_KEYS = 
"path_partition_keys";
+
+    // decimal(p,s)
+    public static final Pattern DECIMAL_TYPE_PATTERN = 
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
+    // datetime(p)
+    public static final Pattern DATETIME_TYPE_PATTERN = 
Pattern.compile("datetime\\((\\d+)\\)");
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
new file mode 100644
index 00000000000..0b646a00b16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+
+import com.google.common.base.Strings;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+public class FileFormatUtils {
+
+    public static boolean isCsv(String formatStr) {
+        return FileFormatConstants.FORMAT_CSV.equalsIgnoreCase(formatStr)
+                || 
FileFormatConstants.FORMAT_CSV_WITH_NAMES.equalsIgnoreCase(formatStr)
+                || 
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES.equalsIgnoreCase(formatStr)
+                || 
FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr);
+    }
+
+    // public for unit test
+    public static void parseCsvSchema(List<Column> csvSchema, String 
csvSchemaStr)
+            throws AnalysisException {
+        if (Strings.isNullOrEmpty(csvSchemaStr)) {
+            return;
+        }
+        // the schema str is like: 
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
+        String[] schemaStrs = csvSchemaStr.split(";");
+        try {
+            for (String schemaStr : schemaStrs) {
+                String[] kv = schemaStr.replace(" ", "").split(":");
+                if (kv.length != 2) {
+                    throw new AnalysisException("invalid csv schema: " + 
csvSchemaStr);
+                }
+                Column column = null;
+                String name = kv[0].toLowerCase();
+                FeNameFormat.checkColumnName(name);
+                String type = kv[1].toLowerCase();
+                if (type.equals("tinyint")) {
+                    column = new Column(name, PrimitiveType.TINYINT, true);
+                } else if (type.equals("smallint")) {
+                    column = new Column(name, PrimitiveType.SMALLINT, true);
+                } else if (type.equals("int")) {
+                    column = new Column(name, PrimitiveType.INT, true);
+                } else if (type.equals("bigint")) {
+                    column = new Column(name, PrimitiveType.BIGINT, true);
+                } else if (type.equals("largeint")) {
+                    column = new Column(name, PrimitiveType.LARGEINT, true);
+                } else if (type.equals("float")) {
+                    column = new Column(name, PrimitiveType.FLOAT, true);
+                } else if (type.equals("double")) {
+                    column = new Column(name, PrimitiveType.DOUBLE, true);
+                } else if (type.startsWith("decimal")) {
+                    // regex decimal(p, s)
+                    Matcher matcher = 
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type);
+                    if (!matcher.find()) {
+                        throw new AnalysisException("invalid decimal type: " + 
type);
+                    }
+                    int precision = Integer.parseInt(matcher.group(1));
+                    int scale = Integer.parseInt(matcher.group(2));
+                    column = new Column(name, 
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
+                            "");
+                } else if (type.equals("date")) {
+                    column = new Column(name, ScalarType.createDateType(), 
false, null, true, null, "");
+                } else if (type.startsWith("datetime")) {
+                    int scale = 0;
+                    if (!type.equals("datetime")) {
+                        // regex datetime(s)
+                        Matcher matcher = 
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type);
+                        if (!matcher.find()) {
+                            throw new AnalysisException("invalid datetime 
type: " + type);
+                        }
+                        scale = Integer.parseInt(matcher.group(1));
+                    }
+                    column = new Column(name, 
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
+                } else if (type.equals("string")) {
+                    column = new Column(name, PrimitiveType.STRING, true);
+                } else if (type.equals("boolean")) {
+                    column = new Column(name, PrimitiveType.BOOLEAN, true);
+                } else {
+                    throw new AnalysisException("unsupported column type: " + 
type);
+                }
+                csvSchema.add(column);
+            }
+        } catch (Exception e) {
+            throw new AnalysisException("invalid csv schema: " + 
e.getMessage());
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 2cdc3f1972f..a6924f784fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.qe.ConnectContext;
@@ -551,17 +550,18 @@ public class Util {
 
     public static TFileFormatType getFileFormatTypeFromName(String formatName) 
{
         String lowerFileFormat = 
Objects.requireNonNull(formatName).toLowerCase();
-        if (lowerFileFormat.equals("parquet")) {
+        if (lowerFileFormat.equals(FileFormatConstants.FORMAT_PARQUET)) {
             return TFileFormatType.FORMAT_PARQUET;
-        } else if (lowerFileFormat.equals("orc")) {
+        } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ORC)) {
             return TFileFormatType.FORMAT_ORC;
-        } else if (lowerFileFormat.equals("json")) {
+        } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_JSON)) {
             return TFileFormatType.FORMAT_JSON;
             // csv/csv_with_name/csv_with_names_and_types treat as csv format
-        } else if (lowerFileFormat.equals(FeConstants.csv) || 
lowerFileFormat.equals(FeConstants.csv_with_names)
-                || lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
+        } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV)
+                || 
lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+                || 
lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
                 // TODO: Add TEXTFILE to TFileFormatType to Support hive text 
file format.
-                || lowerFileFormat.equals(FeConstants.text)) {
+                || 
lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) {
             return TFileFormatType.FORMAT_CSV_PLAIN;
         } else {
             return TFileFormatType.FORMAT_UNKNOWN;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 6d7031f61c7..d9213360583 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -20,7 +20,7 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.OutFileClause;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -65,8 +65,8 @@ public class ResultFileSink extends DataSink {
 
     public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, 
ArrayList<String> labels) {
         this(exchNodeId, outFileClause);
-        if (outFileClause.getHeaderType().equals(FeConstants.csv_with_names)
-                || 
outFileClause.getHeaderType().equals(FeConstants.csv_with_names_and_types)) {
+        if 
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+                || 
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
 {
             header = genNames(labels, outFileClause.getColumnSeparator(), 
outFileClause.getLineDelimiter());
         }
         headerType = outFileClause.getHeaderType();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 67e09cbf205..7b57d9b0454 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -292,7 +292,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         for (Split split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
             TFileType locationType = 
getLocationType(fileSplit.getPath().toString());
-            setLocationPropertiesIfNecessary(locationType, fileSplit, 
locationProperties);
+            setLocationPropertiesIfNecessary(locationType, locationProperties);
 
             TScanRangeLocations curLocations = newLocations();
             // If fileSplit has partition values, use the values collected 
from hive partitions.
@@ -368,7 +368,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
     }
 
-    private void setLocationPropertiesIfNecessary(TFileType locationType, 
FileSplit fileSplit,
+    private void setLocationPropertiesIfNecessary(TFileType locationType,
             Map<String, String> locationProperties) throws UserException {
         if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
             if (!params.isSetHdfsParams()) {
@@ -455,13 +455,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
     protected abstract Map<String, String> getLocationProperties() throws 
UserException;
 
-    // eg: hdfs://namenode  s3://buckets
-    protected String getFsName(FileSplit split) {
-        String fullPath = split.getPath().toUri().toString();
-        String filePath = split.getPath().toUri().getPath();
-        return fullPath.replace(filePath, "");
-    }
-
     protected static Optional<TFileType> getTFileType(String location) {
         if (location != null && !location.isEmpty()) {
             if (S3Util.isObjStorage(location)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index d36958ad88b..52bb119d7a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -27,9 +27,9 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.Load;
@@ -131,8 +131,8 @@ public class LoadScanProvider {
 
     private String getHeaderType(String formatType) {
         if (formatType != null) {
-            if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || 
formatType.equalsIgnoreCase(
-                    FeConstants.csv_with_names_and_types)) {
+            if 
(formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+                    || 
formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
 {
                 return formatType;
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index c069aa43c30..6cdd9d9834f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -81,10 +81,6 @@ public class TVFScanNode extends FileQueryScanNode {
         numNodes = backendPolicy.numBackends();
     }
 
-    protected String getFsName(FileSplit split) {
-        return tableValuedFunction.getFsName();
-    }
-
     @Override
     public TFileAttributes getFileAttributes() throws UserException {
         return tableValuedFunction.getFileAttributes();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9ab79eecaaa..8fddb508466 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -32,12 +32,12 @@ import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.external.TVFScanNode;
@@ -67,6 +67,7 @@ import org.apache.doris.thrift.TTextSerdeType;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -80,8 +81,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 /**
@@ -89,43 +88,22 @@ import java.util.stream.Collectors;
  */
 public abstract class ExternalFileTableValuedFunction extends 
TableValuedFunctionIf {
     public static final Logger LOG = 
LogManager.getLogger(ExternalFileTableValuedFunction.class);
-    protected static String DEFAULT_COLUMN_SEPARATOR = ",";
-    protected static String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
-    protected static final String DEFAULT_LINE_DELIMITER = "\n";
-    public static final String FORMAT = "format";
-    public static final String COLUMN_SEPARATOR = "column_separator";
-    public static final String LINE_DELIMITER = "line_delimiter";
-    protected static final String JSON_ROOT = "json_root";
-    protected static final String JSON_PATHS = "jsonpaths";
-    protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
-    protected static final String READ_JSON_BY_LINE = "read_json_by_line";
-    protected static final String NUM_AS_STRING = "num_as_string";
-    protected static final String FUZZY_PARSE = "fuzzy_parse";
-    protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
-    protected static final String SKIP_LINES = "skip_lines";
-    protected static final String CSV_SCHEMA = "csv_schema";
-    protected static final String COMPRESS_TYPE = "compress_type";
-    public static final String PATH_PARTITION_KEYS = "path_partition_keys";
-    // decimal(p,s)
-    private static final Pattern DECIMAL_TYPE_PATTERN = 
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
-    // datetime(p)
-    private static final Pattern DATETIME_TYPE_PATTERN = 
Pattern.compile("datetime\\((\\d+)\\)");
 
     protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new 
ImmutableSet.Builder<String>()
-            .add(FORMAT)
-            .add(JSON_ROOT)
-            .add(JSON_PATHS)
-            .add(STRIP_OUTER_ARRAY)
-            .add(READ_JSON_BY_LINE)
-            .add(NUM_AS_STRING)
-            .add(FUZZY_PARSE)
-            .add(COLUMN_SEPARATOR)
-            .add(LINE_DELIMITER)
-            .add(TRIM_DOUBLE_QUOTES)
-            .add(SKIP_LINES)
-            .add(CSV_SCHEMA)
-            .add(COMPRESS_TYPE)
-            .add(PATH_PARTITION_KEYS)
+            .add(FileFormatConstants.PROP_FORMAT)
+            .add(FileFormatConstants.PROP_JSON_ROOT)
+            .add(FileFormatConstants.PROP_JSON_PATHS)
+            .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY)
+            .add(FileFormatConstants.PROP_READ_JSON_BY_LINE)
+            .add(FileFormatConstants.PROP_NUM_AS_STRING)
+            .add(FileFormatConstants.PROP_FUZZY_PARSE)
+            .add(FileFormatConstants.PROP_COLUMN_SEPARATOR)
+            .add(FileFormatConstants.PROP_LINE_DELIMITER)
+            .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES)
+            .add(FileFormatConstants.PROP_SKIP_LINES)
+            .add(FileFormatConstants.PROP_CSV_SCHEMA)
+            .add(FileFormatConstants.PROP_COMPRESS_TYPE)
+            .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS)
             .build();
 
     // Columns got from file and path(if has)
@@ -137,17 +115,16 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     private List<String> pathPartitionKeys;
 
     protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
-    protected Map<String, String> locationProperties;
+    protected Map<String, String> locationProperties = Maps.newHashMap();
     protected String filePath;
 
-
-    private TFileFormatType fileFormatType;
+    protected TFileFormatType fileFormatType;
     private TFileCompressType compressionType;
     private String headerType = "";
 
     private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
-    private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
-    private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+    private String columnSeparator = 
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
+    private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER;
     private String jsonRoot = "";
     private String jsonPaths = "";
     private boolean stripOuterArray;
@@ -175,20 +152,6 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         return locationProperties;
     }
 
-    public List<Column> getCsvSchema() {
-        return csvSchema;
-    }
-
-    public String getFsName() {
-        TFileType fileType = getTFileType();
-        if (fileType == TFileType.FILE_HDFS) {
-            return locationProperties.get(HdfsResource.HADOOP_FS_NAME);
-        } else if (fileType == TFileType.FILE_S3) {
-            return locationProperties.get(S3Properties.ENDPOINT);
-        }
-        return "";
-    }
-
     public List<String> getPathPartitionKeys() {
         return pathPartitionKeys;
     }
@@ -203,25 +166,29 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         }
     }
 
-    //The keys in the passed validParams map need to be lowercase.
-    protected void parseProperties(Map<String, String> validParams) throws 
AnalysisException {
-        String formatString = validParams.getOrDefault(FORMAT, "");
-        String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
+    //The keys in properties map need to be lowercase.
+    protected Map<String, String> parseCommonProperties(Map<String, String> 
properties) throws AnalysisException {
+        // Copy the properties, because we will remove the key from properties.
+        Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        copiedProps.putAll(properties);
+
+        String formatString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FORMAT, "");
+        String defaultColumnSeparator = 
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
         switch (formatString) {
             case "csv":
                 this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
                 break;
             case "hive_text":
                 this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                defaultColumnSeparator = DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
+                defaultColumnSeparator = 
FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
                 this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE;
                 break;
             case "csv_with_names":
-                this.headerType = FeConstants.csv_with_names;
+                this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
                 this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
                 break;
             case "csv_with_names_and_types":
-                this.headerType = FeConstants.csv_with_names_and_types;
+                this.headerType = 
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
                 this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
                 break;
             case "parquet":
@@ -240,113 +207,61 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
                 throw new AnalysisException("format:" + formatString + " is 
not supported.");
         }
 
-        columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, 
defaultColumnSeparator);
+        columnSeparator = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_COLUMN_SEPARATOR,
+                defaultColumnSeparator);
         if (Strings.isNullOrEmpty(columnSeparator)) {
             throw new AnalysisException("column_separator can not be empty.");
         }
         columnSeparator = Separator.convertSeparator(columnSeparator);
 
-        lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, 
DEFAULT_LINE_DELIMITER);
+        lineDelimiter = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_LINE_DELIMITER,
+                FileFormatConstants.DEFAULT_LINE_DELIMITER);
         if (Strings.isNullOrEmpty(lineDelimiter)) {
             throw new AnalysisException("line_delimiter can not be empty.");
         }
         lineDelimiter = Separator.convertSeparator(lineDelimiter);
 
-        jsonRoot = validParams.getOrDefault(JSON_ROOT, "");
-        jsonPaths = validParams.getOrDefault(JSON_PATHS, "");
-        readJsonByLine = 
Boolean.valueOf(validParams.get(READ_JSON_BY_LINE)).booleanValue();
-        stripOuterArray = 
Boolean.valueOf(validParams.get(STRIP_OUTER_ARRAY)).booleanValue();
-        numAsString = 
Boolean.valueOf(validParams.get(NUM_AS_STRING)).booleanValue();
-        fuzzyParse = 
Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue();
-        trimDoubleQuotes = 
Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue();
-        skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, 
"0")).intValue();
-
+        jsonRoot = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_JSON_ROOT, "");
+        jsonPaths = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_JSON_PATHS, "");
+        readJsonByLine = Boolean.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue();
+        stripOuterArray = Boolean.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue();
+        numAsString = Boolean.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue();
+        fuzzyParse = Boolean.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue();
+        trimDoubleQuotes = Boolean.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue();
+        skipLines = Integer.valueOf(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_SKIP_LINES, "0")).intValue();
+
+        String compressTypeStr = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN");
         try {
-            compressionType = 
Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN"));
+            compressionType = Util.getFileCompressType(compressTypeStr);
         } catch (IllegalArgumentException e) {
-            throw new AnalysisException("Compress type : " + 
validParams.get(COMPRESS_TYPE) + " is not supported.");
+            throw new AnalysisException("Compress type : " +  compressTypeStr 
+ " is not supported.");
         }
-        if (formatString.equals("csv") || formatString.equals("csv_with_names")
-                || formatString.equals("csv_with_names_and_types")) {
-            parseCsvSchema(csvSchema, validParams);
+        if (FileFormatUtils.isCsv(formatString)) {
+            FileFormatUtils.parseCsvSchema(csvSchema, 
getOrDefaultAndRemove(copiedProps,
+                    FileFormatConstants.PROP_CSV_SCHEMA, ""));
+            LOG.debug("get csv schema: {}", csvSchema);
         }
-        pathPartitionKeys = 
Optional.ofNullable(validParams.get(PATH_PARTITION_KEYS))
-                .map(str ->
-                        Arrays.stream(str.split(","))
-                                .map(String::trim)
-                                .collect(Collectors.toList()))
+
+        pathPartitionKeys = Optional.ofNullable(
+                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
+                .map(str -> Arrays.stream(str.split(","))
+                        .map(String::trim)
+                        .collect(Collectors.toList()))
                 .orElse(Lists.newArrayList());
+
+        return copiedProps;
     }
 
-    // public for unit test
-    public static void parseCsvSchema(List<Column> csvSchema, Map<String, 
String> validParams)
-            throws AnalysisException {
-        String csvSchemaStr = validParams.get(CSV_SCHEMA);
-        if (Strings.isNullOrEmpty(csvSchemaStr)) {
-            return;
-        }
-        // the schema str is like: 
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
-        String[] schemaStrs = csvSchemaStr.split(";");
-        try {
-            for (String schemaStr : schemaStrs) {
-                String[] kv = schemaStr.replace(" ", "").split(":");
-                if (kv.length != 2) {
-                    throw new AnalysisException("invalid csv schema: " + 
csvSchemaStr);
-                }
-                Column column = null;
-                String name = kv[0].toLowerCase();
-                FeNameFormat.checkColumnName(name);
-                String type = kv[1].toLowerCase();
-                if (type.equals("tinyint")) {
-                    column = new Column(name, PrimitiveType.TINYINT, true);
-                } else if (type.equals("smallint")) {
-                    column = new Column(name, PrimitiveType.SMALLINT, true);
-                } else if (type.equals("int")) {
-                    column = new Column(name, PrimitiveType.INT, true);
-                } else if (type.equals("bigint")) {
-                    column = new Column(name, PrimitiveType.BIGINT, true);
-                } else if (type.equals("largeint")) {
-                    column = new Column(name, PrimitiveType.LARGEINT, true);
-                } else if (type.equals("float")) {
-                    column = new Column(name, PrimitiveType.FLOAT, true);
-                } else if (type.equals("double")) {
-                    column = new Column(name, PrimitiveType.DOUBLE, true);
-                } else if (type.startsWith("decimal")) {
-                    // regex decimal(p, s)
-                    Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type);
-                    if (!matcher.find()) {
-                        throw new AnalysisException("invalid decimal type: " + 
type);
-                    }
-                    int precision = Integer.parseInt(matcher.group(1));
-                    int scale = Integer.parseInt(matcher.group(2));
-                    column = new Column(name, 
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
-                            "");
-                } else if (type.equals("date")) {
-                    column = new Column(name, ScalarType.createDateType(), 
false, null, true, null, "");
-                } else if (type.startsWith("datetime")) {
-                    int scale = 0;
-                    if (!type.equals("datetime")) {
-                        // regex datetime(s)
-                        Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type);
-                        if (!matcher.find()) {
-                            throw new AnalysisException("invalid datetime 
type: " + type);
-                        }
-                        scale = Integer.parseInt(matcher.group(1));
-                    }
-                    column = new Column(name, 
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
-                } else if (type.equals("string")) {
-                    column = new Column(name, PrimitiveType.STRING, true);
-                } else if (type.equals("boolean")) {
-                    column = new Column(name, PrimitiveType.BOOLEAN, true);
-                } else {
-                    throw new AnalysisException("unsupported column type: " + 
type);
-                }
-                csvSchema.add(column);
-            }
-            LOG.debug("get csv schema: {}", csvSchema);
-        } catch (Exception e) {
-            throw new AnalysisException("invalid csv schema: " + 
e.getMessage());
-        }
+    protected String getOrDefaultAndRemove(Map<String, String> props, String 
key, String defaultValue) {
+        String value = props.getOrDefault(key, defaultValue);
+        props.remove(key);
+        return value;
     }
 
     public List<TBrokerFileStatus> getFileStatuses() {
@@ -541,3 +456,4 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 }
 
 
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
index eb8e8f70f7b..051706ae474 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
@@ -18,16 +18,14 @@
 package org.apache.doris.tablefunction;
 
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.ExportStmt;
+import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.URI;
 import org.apache.doris.thrift.TFileType;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
+import com.google.common.base.Strings;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -39,50 +37,41 @@ import java.util.Map;
  */
 public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
     public static final Logger LOG = 
LogManager.getLogger(HdfsTableValuedFunction.class);
-
     public static final String NAME = "hdfs";
-    public static final String HDFS_URI = "uri";
-    // simple or kerberos
+    private static final String PROP_URI = "uri";
+
+    public HdfsTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        init(properties);
+    }
 
-    private static final ImmutableSet<String> LOCATION_PROPERTIES = new 
ImmutableSet.Builder<String>()
-            .add(HDFS_URI)
-            .add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
-            .add(HdfsResource.HADOOP_FS_NAME)
-            .add(HdfsResource.HADOOP_USER_NAME)
-            .add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL)
-            .add(HdfsResource.HADOOP_KERBEROS_KEYTAB)
-            .add(HdfsResource.HADOOP_SHORT_CIRCUIT)
-            .add(HdfsResource.HADOOP_SOCKET_PATH)
-            .build();
+    private void init(Map<String, String> properties) throws AnalysisException 
{
+        // 1. analyze common properties
+        Map<String, String> otherProps = 
super.parseCommonProperties(properties);
 
-    private URI hdfsUri;
+        // 2. analyze uri
+        String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
+        if (Strings.isNullOrEmpty(uriStr)) {
+            throw new AnalysisException(String.format("Properties '%s' is 
required.", PROP_URI));
+        }
+        URI uri = URI.create(uriStr);
+        StorageBackend.checkUri(uri, StorageType.HDFS);
+        filePath = uri.getScheme() + "://" + uri.getAuthority() + 
uri.getPath();
 
-    public HdfsTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
-        Map<String, String> fileParams = new CaseInsensitiveMap();
-        locationProperties = Maps.newHashMap();
-        for (String key : params.keySet()) {
-            String lowerKey = key.toLowerCase();
-            if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
-                fileParams.put(lowerKey, params.get(key));
-            } else if (LOCATION_PROPERTIES.contains(lowerKey)) {
-                locationProperties.put(lowerKey, params.get(key));
-            } else if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
+        // 3. analyze other properties
+        for (String key : otherProps.keySet()) {
+            if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
                 // because HADOOP_FS_NAME contains upper and lower case
-                locationProperties.put(HdfsResource.HADOOP_FS_NAME, 
params.get(key));
+                locationProperties.put(HdfsResource.HADOOP_FS_NAME, 
otherProps.get(key));
             } else {
-                locationProperties.put(key, params.get(key));
+                locationProperties.put(key, otherProps.get(key));
             }
         }
-
-        if (!locationProperties.containsKey(HDFS_URI)) {
-            throw new AnalysisException(String.format("Configuration '%s' is 
required.", HDFS_URI));
+        // If the user does not specify the HADOOP_FS_NAME, we will use the 
uri's scheme and authority
+        if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+            locationProperties.put(HdfsResource.HADOOP_FS_NAME, 
uri.getScheme() + "://" + uri.getAuthority());
         }
-        ExportStmt.checkPath(locationProperties.get(HDFS_URI), 
StorageType.HDFS);
-        hdfsUri = URI.create(locationProperties.get(HDFS_URI));
-        filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + 
hdfsUri.getPath();
-
-        super.parseProperties(fileParams);
 
+        // 4. parse file
         parseFile();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
index 265045d7a6f..5044f045c31 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
@@ -20,9 +20,9 @@ package org.apache.doris.tablefunction;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 
-import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -36,24 +36,15 @@ public class HttpStreamTableValuedFunction extends 
ExternalFileTableValuedFuncti
     private static final Logger LOG = 
LogManager.getLogger(HttpStreamTableValuedFunction.class);
     public static final String NAME = "http_stream";
 
-    public HttpStreamTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
-        Map<String, String> fileParams = new CaseInsensitiveMap();
-        for (String key : params.keySet()) {
-            String lowerKey = key.toLowerCase();
-            if (!FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
-                throw new AnalysisException(key + " is invalid property");
-            }
-            fileParams.put(lowerKey, params.get(key).toLowerCase());
-        }
+    public HttpStreamTableValuedFunction(Map<String, String> properties) 
throws AnalysisException {
+        // 1. analyze common properties
+        super.parseCommonProperties(properties);
 
-        String formatString = fileParams.getOrDefault(FORMAT, 
"").toLowerCase();
-        if (formatString.equals("parquet")
-                || formatString.equals("avro")
-                || formatString.equals("orc")) {
-            throw new AnalysisException("current http_stream does not yet 
support parquet, avro and orc");
+        if (fileFormatType == TFileFormatType.FORMAT_PARQUET
+                || fileFormatType == TFileFormatType.FORMAT_AVRO
+                || fileFormatType == TFileFormatType.FORMAT_ORC) {
+            throw new AnalysisException("http_stream does not yet support 
parquet, avro and orc");
         }
-
-        super.parseProperties(fileParams);
     }
 
     // =========== implement abstract methods of 
ExternalFileTableValuedFunction =================
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
index 129c3f930c7..350621e3550 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
@@ -31,8 +31,6 @@ import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -46,42 +44,31 @@ import java.util.concurrent.TimeUnit;
  */
 public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
     private static final Logger LOG = 
LogManager.getLogger(LocalTableValuedFunction.class);
-
     public static final String NAME = "local";
-    public static final String FILE_PATH = "file_path";
-    public static final String BACKEND_ID = "backend_id";
+    public static final String PROP_FILE_PATH = "file_path";
+    public static final String PROP_BACKEND_ID = "backend_id";
 
     private static final ImmutableSet<String> LOCATION_PROPERTIES = new 
ImmutableSet.Builder<String>()
-            .add(FILE_PATH)
-            .add(BACKEND_ID)
+            .add(PROP_FILE_PATH)
+            .add(PROP_BACKEND_ID)
             .build();
 
     private long backendId;
 
-    public LocalTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
-        Map<String, String> fileParams = new CaseInsensitiveMap();
-        locationProperties = Maps.newHashMap();
-        for (String key : params.keySet()) {
-            String lowerKey = key.toLowerCase();
-            if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
-                fileParams.put(lowerKey, params.get(key));
-            } else if (LOCATION_PROPERTIES.contains(lowerKey)) {
-                locationProperties.put(lowerKey, params.get(key));
-            } else {
-                throw new AnalysisException(key + " is invalid property");
-            }
-        }
+    public LocalTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        // 1. analyze common properties
+        Map<String, String> otherProps = 
super.parseCommonProperties(properties);
 
+        // 2. analyze location properties
         for (String key : LOCATION_PROPERTIES) {
-            if (!locationProperties.containsKey(key)) {
-                throw new AnalysisException(String.format("Configuration '%s' 
is required.", key));
+            if (!otherProps.containsKey(key)) {
+                throw new AnalysisException(String.format("Property '%s' is 
required.", key));
             }
         }
+        filePath = otherProps.get(PROP_FILE_PATH);
+        backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID));
 
-        filePath = locationProperties.get(FILE_PATH);
-        backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
-        super.parseProperties(fileParams);
-
+        // 3. parse file
         getFileListFromBackend();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 504730daaee..9ad6232c4e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -30,10 +30,9 @@ import 
org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.thrift.TFileType;
 
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -49,71 +48,46 @@ import java.util.Map;
  */
 public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
     public static final String NAME = "s3";
-    public static final String S3_URI = "uri";
+    public static final String PROP_URI = "uri";
 
     private static final ImmutableSet<String> DEPRECATED_KEYS =
-            ImmutableSet.of("access_key", "secret_key", "session_token", 
"region");
-
-    private static final ImmutableSet<String> OPTIONAL_KEYS =
-            ImmutableSet.of(S3Properties.SESSION_TOKEN, 
PropertyConverter.USE_PATH_STYLE, S3Properties.REGION,
-                    PATH_PARTITION_KEYS);
-
-    private static final ImmutableSet<String> LOCATION_PROPERTIES = 
ImmutableSet.<String>builder()
-            .add(S3_URI)
-            .add(S3Properties.ENDPOINT)
-            .addAll(DEPRECATED_KEYS)
-            .addAll(S3Properties.TVF_REQUIRED_FIELDS)
-            .addAll(OPTIONAL_KEYS)
-            .build();
-
-    private final S3URI s3uri;
-    private final boolean forceVirtualHosted;
-    private String virtualBucket = "";
+            ImmutableSet.of("access_key", "secret_key", "session_token", 
"region",
+                    "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION");
 
-    public S3TableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+    private String virtualBucket = "";
 
-        Map<String, String> fileParams = new HashMap<>();
-        for (Map.Entry<String, String> entry : params.entrySet()) {
-            String key = entry.getKey();
-            String lowerKey = key.toLowerCase();
-            if (!LOCATION_PROPERTIES.contains(lowerKey) && 
!FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
-                throw new AnalysisException("Invalid property: " + key);
-            }
-            if (DEPRECATED_KEYS.contains(lowerKey)) {
-                lowerKey = S3Properties.S3_PREFIX + lowerKey;
-            }
-            fileParams.put(lowerKey, entry.getValue());
-        }
+    public S3TableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        // 1. analyze common properties
+        Map<String, String> otherProps = 
super.parseCommonProperties(properties);
 
-        if (!fileParams.containsKey(S3_URI)) {
-            throw new AnalysisException("Missing required property: " + 
S3_URI);
+        // 2. analyze uri and other properties
+        String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
+        if (Strings.isNullOrEmpty(uriStr)) {
+            throw new AnalysisException(String.format("Properties '%s' is 
required.", PROP_URI));
         }
-
-        forceVirtualHosted = isVirtualHosted(fileParams);
-        s3uri = getS3Uri(fileParams);
-        final String endpoint = forceVirtualHosted
-                ? getEndpointAndSetVirtualBucket(params)
-                : s3uri.getBucketScheme();
-        if (!fileParams.containsKey(S3Properties.REGION)) {
+        forwardCompatibleDeprecatedKeys(otherProps);
+
+        String usePathStyle = getOrDefaultAndRemove(otherProps, 
PropertyConverter.USE_PATH_STYLE, "false");
+        boolean forceVirtualHosted = isVirtualHosted(uriStr, 
Boolean.parseBoolean(usePathStyle));
+        S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted);
+        String endpoint = forceVirtualHosted
+                ? getEndpointAndSetVirtualBucket(s3uri, otherProps) : 
s3uri.getBucketScheme();
+        if (!otherProps.containsKey(S3Properties.REGION)) {
             String region = S3Properties.getRegionOfEndpoint(endpoint);
-            fileParams.put(S3Properties.REGION, region);
+            otherProps.put(S3Properties.REGION, region);
         }
+        checkNecessaryS3Properties(otherProps);
         CloudCredentialWithEndpoint credential = new 
CloudCredentialWithEndpoint(endpoint,
-                fileParams.get(S3Properties.REGION),
-                fileParams.get(S3Properties.ACCESS_KEY),
-                fileParams.get(S3Properties.SECRET_KEY));
-        if (fileParams.containsKey(S3Properties.SESSION_TOKEN)) {
-            
credential.setSessionToken(fileParams.get(S3Properties.SESSION_TOKEN));
+                otherProps.get(S3Properties.REGION),
+                otherProps.get(S3Properties.ACCESS_KEY),
+                otherProps.get(S3Properties.SECRET_KEY));
+        if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) {
+            
credential.setSessionToken(otherProps.get(S3Properties.SESSION_TOKEN));
         }
 
-        // set S3 location properties
-        // these five properties is necessary, no one can be lost.
         locationProperties = S3Properties.credentialToMap(credential);
-        String usePathStyle = 
fileParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
         locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
-        
this.locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(this.locationProperties));
-
-        super.parseProperties(fileParams);
+        
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
 
         if (forceVirtualHosted) {
             filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket + 
S3URI.PATH_DELIM
@@ -130,39 +104,59 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         }
     }
 
-    private String getEndpointAndSetVirtualBucket(Map<String, String> params) 
throws AnalysisException {
-        Preconditions.checkState(forceVirtualHosted, "only invoked when force 
virtual hosted.");
-        String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
-        virtualBucket = fileds[0];
-        if (fileds.length > 1) {
+    private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
+        for (String deprecatedKey : DEPRECATED_KEYS) {
+            String value = props.remove(deprecatedKey);
+            if (!Strings.isNullOrEmpty(value)) {
+                props.put("s3." + deprecatedKey.toLowerCase(), value);
+            }
+        }
+    }
+
+    private void checkNecessaryS3Properties(Map<String, String> props) throws 
AnalysisException {
+        if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
+            throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.REGION));
+        }
+        if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) {
+            throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.ACCESS_KEY));
+        }
+        if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) {
+            throw new AnalysisException(String.format("Properties '%s' is 
required.", S3Properties.SECRET_KEY));
+        }
+    }
+
+    private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map<String, 
String> props)
+            throws AnalysisException {
+        String[] fields = s3uri.getVirtualBucket().split("\\.", 2);
+        virtualBucket = fields[0];
+        if (fields.length > 1) {
             // At this point, s3uri.getVirtualBucket() is: 
virtualBucket.endpoint, Eg:
             //          uri: 
http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
             // s3uri.getVirtualBucket() = 
my_bucket.cos.ap-beijing.myqcloud.com,
             // so we need separate virtualBucket and endpoint.
-            return fileds[1];
-        } else if (params.containsKey(S3Properties.ENDPOINT)) {
-            return params.get(S3Properties.ENDPOINT);
+            return fields[1];
+        } else if (props.containsKey(S3Properties.ENDPOINT)) {
+            return props.get(S3Properties.ENDPOINT);
         } else {
             throw new AnalysisException("can not parse endpoint, please check 
uri.");
         }
     }
 
-    private boolean isVirtualHosted(Map<String, String> validParams) {
-        String originUri = validParams.getOrDefault(S3_URI, "");
-        if (originUri.toLowerCase().startsWith("s3")) {
+    private boolean isVirtualHosted(String uri, boolean usePathStyle) {
+        if (uri.toLowerCase().startsWith("s3")) {
             // s3 protocol, default virtual-hosted style
             return true;
         } else {
             // not s3 protocol, forceVirtualHosted is determined by 
USE_PATH_STYLE.
-            return 
!Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE));
+            return !usePathStyle;
         }
     }
 
-    private S3URI getS3Uri(Map<String, String> validParams) throws 
AnalysisException {
+    private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws 
AnalysisException {
         try {
-            return S3URI.create(validParams.get(S3_URI), forceVirtualHosted);
+            return S3URI.create(uri, forceVirtualHosted);
         } catch (UserException e) {
-            throw new AnalysisException("parse s3 uri failed, uri = " + 
validParams.get(S3_URI), e);
+            throw new AnalysisException("parse s3 uri failed, uri = " + uri, 
e);
         }
     }
 
@@ -189,3 +183,4 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         return "S3TableValuedFunction";
     }
 }
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
index f664415e6d5..e5b06bd5dd4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.FileFormatUtils;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -35,12 +37,12 @@ public class ExternalFileTableValuedFunctionTest {
     public void testCsvSchemaParse() {
         Config.enable_date_conversion = true;
         Map<String, String> properties = Maps.newHashMap();
-        properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
+        properties.put(FileFormatConstants.PROP_CSV_SCHEMA,
                 
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:bool;"
                         + 
"k8:char(10);k9:varchar(20);k10:date;k11:datetime;k12:decimal(10,2)");
         List<Column> csvSchema = Lists.newArrayList();
         try {
-            ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
properties);
+            FileFormatUtils.parseCsvSchema(csvSchema, 
properties.get(FileFormatConstants.PROP_CSV_SCHEMA));
             Assert.fail();
         } catch (AnalysisException e) {
             e.printStackTrace();
@@ -48,11 +50,11 @@ public class ExternalFileTableValuedFunctionTest {
         }
 
         csvSchema.clear();
-        properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
+        properties.put(FileFormatConstants.PROP_CSV_SCHEMA,
                 
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:boolean;"
                         + "k8:string;k9:date;k10:datetime;k11:decimal(10, 
2);k12:decimal( 38,10); k13:datetime(5)");
         try {
-            ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
properties);
+            FileFormatUtils.parseCsvSchema(csvSchema, 
properties.get(FileFormatConstants.PROP_CSV_SCHEMA));
             Assert.assertEquals(13, csvSchema.size());
             Column decimalCol = csvSchema.get(10);
             Assert.assertEquals(10, decimalCol.getPrecision());
diff --git a/regression-test/suites/export_p2/test_export_max_file_size.groovy 
b/regression-test/suites/export_p2/test_export_max_file_size.groovy
index 0efe3a82cfa..460bc130268 100644
--- a/regression-test/suites/export_p2/test_export_max_file_size.groovy
+++ b/regression-test/suites/export_p2/test_export_max_file_size.groovy
@@ -73,8 +73,8 @@ suite("test_export_max_file_size", "p2") {
             insert into ${table_export_name}
             select * from hdfs(
             "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}",
-            "fs.defaultFS" = "${fs}",
             "hadoop.username" = "${user_name}",
+            "column_separator" = ",",
             "format" = "csv");
         """
 
@@ -126,8 +126,8 @@ suite("test_export_max_file_size", "p2") {
                 insert into ${table_load_name}
                 select * from hdfs(
                 "uri" = "${outfile_url}${j}.csv",
-                "fs.defaultFS" = "${fs}",
                 "hadoop.username" = "${user_name}",
+                "column_separator" = ",",
                 "format" = "csv");
             """
         }
diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy 
b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
index 205b1ffd716..d108d355e29 100644
--- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
+++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
@@ -97,8 +97,8 @@ suite("test_export_with_hdfs", "p2") {
         // check data correctness
         order_qt_select """ select * from hdfs(
                 "uri" = "${outfile_url}0.${file_suffix}",
-                "fs.defaultFS" = "${fs}",
                 "hadoop.username" = "${user_name}",
+                "column_separator" = ",",
                 "format" = "${format}");
             """
     }
diff --git 
a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy 
b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
index 3f9abe2c2b2..42a2354dd95 100644
--- a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
+++ b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
@@ -58,7 +58,6 @@ suite("test_outfile_orc_max_file_size", "p2") {
             insert into ${table_export_name}
             select * from hdfs(
             "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}",
-            "fs.defaultFS" = "${fs}",
             "hadoop.username" = "${user_name}",
             "format" = "orc");
         """
diff --git 
a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
 
b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
index beb3cd3e0cf..b9005037a5c 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
@@ -35,7 +35,7 @@ suite("test_different_parquet_types", "p0") {
             logger.info("record res" + res1_2.toString())
 
         def res1_3 = sql """
-            select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+            select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"format\"
 = \"parquet\") limit 10
             """ 
             logger.info("record res" + res1_3.toString())
     }
@@ -58,7 +58,7 @@ suite("test_different_parquet_types", "p0") {
 
         //return nothing,but no exception
         def res3_3 = sql """
-            select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+            select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"format\"
 = \"parquet\") limit 10
             """ 
             logger.info("record res" + res3_3.toString())
     }
@@ -76,7 +76,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res4_2.toString())
 
         def res4_3 = sql """
-             select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+             select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"format\"
 = \"parquet\") limit 10
              """ 
         logger.info("record res" + res4_3.toString())
     }
@@ -95,7 +95,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res5_2.toString())
 
          def res5_3 = sql """
-        select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"format\"
 = \"parquet\") limit 10
         """ 
         logger.info("record res" + res5_3.toString())
     }
@@ -114,7 +114,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res6_2.toString())
 
         def res6_3 = sql """
-        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"fs.defaultFS\"
 = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"format\"
 = \"parquet\") limit 10
         """ 
         logger.info("record res" + res6_3.toString())
 
@@ -133,7 +133,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res7_2.toString())
 
          def res7_3 = sql """
-        select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"format\"
 = \"parquet\") limit 10
         """ 
         logger.info("record res" + res7_3.toString())
     }
@@ -152,7 +152,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res8_2.toString())
 
         def res8_3 = sql """
-        select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"format\"
 = \"parquet\") limit 10
         """ 
         logger.info("record res" + res8_3.toString())
     }
@@ -170,7 +170,7 @@ suite("test_different_parquet_types", "p0") {
         logger.info("record res" + res9_2.toString())
 
          def res9_3 = sql """
-        select * from hdfs(\"uri" = 
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"fs.defaultFS\"
 = \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+        select * from hdfs(\"uri" = 
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"format\"
 = \"parquet\") limit 10
         """ 
         logger.info("record res" + res9_3.toString())
     }
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy 
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index 0535eb6505a..a4b9bdd71c8 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -32,8 +32,8 @@ suite("test_hdfs_tvf") {
             format = "csv"
             qt_csv_all_types """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
+                        "column_separator" = ",",
                         "format" = "${format}") order by c1; """
 
 
@@ -41,15 +41,14 @@ suite("test_hdfs_tvf") {
             format = "csv"
             qt_csv_student """ select cast(c1 as INT) as id, c2 as name, c3 as 
age from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
+                        "column_separator" = ",",
                         "format" = "${format}") order by id; """
 
             uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/csv_format_test/array_malformat.csv"
             format = "csv"
             qt_csv_array_malformat """ select * from HDFS(
                                         "uri" = "${uri}",
-                                        "fs.defaultFS"= "${defaultFS}",
                                         "hadoop.username" = "${hdfsUserName}",
                                         "format" = "${format}",
                                         "column_separator" = "|") order by c1; 
"""
@@ -57,7 +56,6 @@ suite("test_hdfs_tvf") {
             uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/csv_format_test/array_normal.csv"
             format = "csv"
             qt_csv_array_normal """ select * from HDFS("uri" = "${uri}",
-                                    "fs.defaultFS"= "${defaultFS}",
                                     "hadoop.username" = "${hdfsUserName}",
                                     "format" = "${format}",
                                     "column_separator" = "|") order by c1; """
@@ -67,9 +65,9 @@ suite("test_hdfs_tvf") {
             format = "csv"
             qt_csv_with_compress_type """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
+                        "column_separator" = ",",
                         "compress_type" = "GZ") order by c1; """
 
             // test csv format infer compress type
@@ -77,8 +75,8 @@ suite("test_hdfs_tvf") {
             format = "csv"
             qt_csv_infer_compress_type """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
+                        "column_separator" = ",",
                         "format" = "${format}") order by c1; """
 
             // test csv_with_names file format
@@ -86,8 +84,8 @@ suite("test_hdfs_tvf") {
             format = "csv_with_names"
             qt_csv_names """ select cast(id as INT) as id, name, age from HDFS(
                             "uri" = "${uri}",
-                            "fs.defaultFS"= "${defaultFS}",
                             "hadoop.username" = "${hdfsUserName}",
+                            "column_separator" = ",",
                             "format" = "${format}") order by id; """
 
             // test csv_with_names_and_types file format
@@ -95,8 +93,8 @@ suite("test_hdfs_tvf") {
             format = "csv_with_names_and_types"
             qt_csv_names_types """ select cast(id as INT) as id, name, age 
from HDFS(
                                     "uri" = "${uri}",
-                                    "fs.defaultFS"= "${defaultFS}",
                                     "hadoop.username" = "${hdfsUserName}",
+                                    "column_separator" = ",",
                                     "format" = "${format}") order by id; """
 
 
@@ -105,7 +103,6 @@ suite("test_hdfs_tvf") {
             format = "parquet"
             qt_parquet """ select * from HDFS(
                             "uri" = "${uri}",
-                            "fs.defaultFS"= "${defaultFS}",
                             "hadoop.username" = "${hdfsUserName}",
                             "format" = "${format}") order by s_suppkey limit 
20; """
 
@@ -114,7 +111,6 @@ suite("test_hdfs_tvf") {
             format = "orc"
             qt_orc """ select * from HDFS(
                             "uri" = "${uri}",
-                            "fs.defaultFS"= "${defaultFS}",
                             "hadoop.username" = "${hdfsUserName}",
                             "format" = "${format}") order by p_partkey limit 
20; """
 
@@ -124,7 +120,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -135,7 +130,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_limit1 """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -145,7 +139,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_limit2 """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "true",
@@ -154,7 +147,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_limit3 """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -163,7 +155,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_limit4 """ select * from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -175,7 +166,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_root """ select cast(id as INT) as id, city, cast(code as 
INT) as code from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -187,7 +177,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_json_paths """ select cast(id as INT) as id, cast(code as INT) 
as code from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -199,7 +188,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_one_array """ select cast(id as INT) as id, city, cast(code as 
INT) as code from HDFS(
                             "uri" = "${uri}",
-                            "fs.defaultFS"= "${defaultFS}",
                             "hadoop.username" = "${hdfsUserName}",
                             "format" = "${format}",
                             "strip_outer_array" = "true",
@@ -211,7 +199,6 @@ suite("test_hdfs_tvf") {
             format = "json"
             qt_cast """ select cast(id as INT) as id, city, cast(code as INT) 
as code from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -240,7 +227,6 @@ suite("test_hdfs_tvf") {
                     select cast (id as INT) as id, city, cast (code as INT) as 
code
                     from HDFS(
                         "uri" = "${uri}",
-                        "fs.defaultFS"= "${defaultFS}",
                         "hadoop.username" = "${hdfsUserName}",
                         "format" = "${format}",
                         "strip_outer_array" = "false",
@@ -256,7 +242,6 @@ suite("test_hdfs_tvf") {
             format = "parquet"
             qt_desc """ desc function HDFS(
                             "uri" = "${uri}",
-                            "fs.defaultFS"= "${defaultFS}",
                             "hadoop.username" = "${hdfsUserName}",
                             "format" = "${format}"); """
         } finally {
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy 
b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
index 40dc3c24405..d71e07487ca 100644
--- 
a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
+++ 
b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
@@ -30,7 +30,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         qt_gz_1 """
         select ${select_field} from HDFS(
             "uri" = "${baseUri}/dt=gzip/000000_0.gz",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -40,7 +39,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         qt_gz_2 """
         desc function HDFS(
             "uri" = "${baseUri}/dt=gzip/000000_0.gz",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -52,7 +50,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select ${select_field} from 
         HDFS(
             "uri" = "${baseUri}/dt=bzip2/000000_0.bz2",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -64,7 +61,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select ${select_field} from         
         HDFS(
             "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -75,7 +71,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select c7 from         
         HDFS(
             "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -88,7 +83,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select ${select_field} from 
         HDFS(
             "uri" = "${baseUri}/dt=plain/000000_0",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -99,7 +93,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select c3,c4,c10 from 
         HDFS(
             "uri" = "${baseUri}/dt=plain/000000_0",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "column_separator" = '\001',
@@ -114,7 +107,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = 
"${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "parquet"
         );
         """
@@ -124,7 +116,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = 
"${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "parquet"
         );
         """
@@ -135,7 +126,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "orc"
         );
         """
@@ -145,7 +135,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "orc"
         );
         """
@@ -156,7 +145,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = 
"${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "csv"
         );
         """
@@ -166,7 +154,6 @@ suite("test_hdfs_tvf_compression", 
"p2,external,tvf,external_remote,external_rem
         select count(*) from 
         HDFS(
             "uri" = 
"${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt",
-            "fs.defaultFS" = "${baseFs}",
             "format" = "csv"
         );
         """
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy 
b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
index ad572936aec..dcd98af2f9d 100644
--- 
a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
+++ 
b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
@@ -27,7 +27,6 @@ suite("test_path_partition_keys", 
"p2,external,tvf,external_remote,external_remo
         order_qt_hdfs_1 """
         select * from HDFS(
             "uri" = "${baseUri}/dt1=cyw/*",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "path_partition_keys"="dt1" ) order by c1,c2 ;
@@ -36,7 +35,6 @@ suite("test_path_partition_keys", 
"p2,external,tvf,external_remote,external_remo
         order_qt_hdfs_2 """
         select * from HDFS(
             "uri" = "${baseUri}/dt1=cyw/*",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "path_partition_keys"="dt1") where dt1!="cyw" order by c1,c2 limit 
3;
@@ -45,7 +43,6 @@ suite("test_path_partition_keys", 
"p2,external,tvf,external_remote,external_remo
         order_qt_hdfs_3 """
         select dt1,c1,count(*) from HDFS(
             "uri" = "${baseUri}/dt1=hello/*",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "path_partition_keys"="dt1") group by c1,dt1 order by c1;
@@ -54,7 +51,6 @@ suite("test_path_partition_keys", 
"p2,external,tvf,external_remote,external_remo
         order_qt_hdfs_4 """
         select * from HDFS(
             "uri" = "${baseUri}/dt2=two/dt1=hello/*",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "path_partition_keys"="dt1") order by c1;
@@ -63,7 +59,6 @@ suite("test_path_partition_keys", 
"p2,external,tvf,external_remote,external_remo
         order_qt_hdfs_5 """
         select * from HDFS(
             "uri" = "${baseUri}/dt2=two/dt1=cyw/*",
-            "fs.defaultFS"= "${baseFs}",
             "hadoop.username" = "hadoop",
             "format" = "csv",
             "path_partition_keys"="dt2,dt1");
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy 
b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
index 57cfdb136d0..279fcb5e8a5 100644
--- 
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
+++ 
b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
@@ -34,6 +34,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",     
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 20;
     """
@@ -47,6 +48,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",     
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}") order by cast(c1 as int),c4 limit 
20;
     """
@@ -62,6 +64,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 15;
     """
@@ -75,6 +78,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}")  where c1!="100"  order by 
cast(c4 as date),c1 limit 13;
     """
@@ -90,6 +94,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",     
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}FRAME") order by c1,c2,c3,c4,c5  
limit 14;
     """
@@ -103,6 +108,7 @@ suite("test_s3_tvf_compression", 
"p2,external,tvf,external_remote,external_remot
         "s3.secret_key" = "${sk}",     
         "REGION" = "${region}",    
         "FORMAT" = "csv",
+        "column_separator" = ",",
         "use_path_style" = "true",
         "compress_type" ="${compress_type}FRAME")  where 
c3="buHDwfGeNHfpRFdNaogneddi" order by c3,c1  limit 14;
     """
diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy 
b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
index 202bc6b9148..08776efd8ec 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
@@ -23,43 +23,41 @@ suite("test_tvf_p2", "p2") {
 
         qt_eof_check """select * from hdfs(
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/bad_store_sales.parquet",
-            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}",
             "format" = "parquet")
             where ss_store_sk = 4 and ss_addr_sk is null order by ss_item_sk"""
 
         // array_ancestor_null.parquet is parquet file whose values in the 
array column are all nulls in a page
         qt_array_ancestor_null """select count(list_double_col) from hdfs(
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/array_ancestor_null.parquet",
-            "format" = "parquet",
-            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+            "format" = "parquet");
+        """
 
         // all_nested_types.parquet is parquet file that contains all complext 
types
         qt_nested_types_parquet """select count(array0), count(array1), 
count(array2), count(array3), count(struct0), count(struct1), count(map0)
             from hdfs(
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/all_nested_types.parquet",
-            "format" = "parquet",
-            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+            "format" = "parquet");
+        """
 
         // all_nested_types.orc is orc file that contains all complext types
         qt_nested_types_orc """select count(array0), count(array1), 
count(array2), count(array3), count(struct0), count(struct1), count(map0)
             from hdfs(
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc",
-            "format" = "orc",
-            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+            "format" = "orc");
+        """
 
         // a row of complex type may be stored across more pages
         qt_row_cross_pages """select count(id), count(m1), count(m2)
             from hdfs(
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
-            "format" = "parquet",
-            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+            "format" = "parquet");
+        """
 
         // viewfs
         qt_viewfs """select count(id), count(m1), count(m2)
             from hdfs(
             "uri" = 
"viewfs://my-cluster/ns1/catalog/tvf/parquet/row_cross_pages.parquet",
             "format" = "parquet",
-            "fs.defaultFS" = "viewfs://my-cluster",
             "fs.viewfs.mounttable.my-cluster.link./ns1" = 
"hdfs://${nameNodeHost}:${hdfsPort}/",
             "fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""
     }
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy 
b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
index 6510571427e..32b9bac9c70 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
@@ -26,8 +26,7 @@ suite("test_tvf_view_count_p2", 
"p2,external,tvf,external_remote,external_remote
         sql """use test_tvf_view_count_p2"""
         sql """set enable_nereids_planner=false"""
         sql """create view tvf_view_count as select * from hdfs (
-            
"uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
-            "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}",
+            
"uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
             "hadoop.username" = "hadoop",
             "format"="parquet");"""
 
diff --git 
a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy 
b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
index 2323fcaff8a..8939154bb53 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
@@ -26,8 +26,7 @@ suite("test_tvf_view_p2", 
"p2,external,tvf,external_remote,external_remote_tvf")
         sql """use test_tvf_view_p2"""
         sql """set enable_fallback_to_original_planner=false"""
         sql """create view tvf_view as select * from hdfs (
-            
"uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
-            "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}",
+            
"uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
             "hadoop.username" = "hadoop",
             "format"="parquet");"""
 
@@ -48,8 +47,7 @@ suite("test_tvf_view_p2", 
"p2,external,tvf,external_remote,external_remote_tvf")
         }
         explain{
             sql("select * from hdfs (\n" +
-                    "  
\"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n"
 +
-                    "  
\"fs.defaultFS\"=\"hdfs://${nameNodeHost}:${hdfsPort}\",\n" +
+                    "  
\"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n"
 +
                     "  \"hadoop.username\" = \"hadoop\",\n" +
                     "  \"format\"=\"parquet\")")
             contains("_table_valued_function_hdfs.p_partkey")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to