This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit c7a97c012d5bd66f7f595fd618748fecb9396ba8 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu Dec 15 16:56:32 2022 +0800 [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079) --- .../org/apache/doris/analysis/OutFileClause.java | 5 ++- .../java/org/apache/doris/backup/HdfsStorage.java | 36 ++++++++--------- .../org/apache/doris/catalog/HdfsResource.java | 8 ---- .../external_catalog_p0/hive/test_hive_other.out | 8 ++++ .../hive/test_hive_other.groovy | 47 +++++++++++++++++++++- 5 files changed, 75 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 31bd346b06..3d3f82b74b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -624,7 +624,6 @@ public class OutFileClause { } else if (filePath.toUpperCase().startsWith(HDFS_FILE_PREFIX.toUpperCase())) { brokerName = StorageBackend.StorageType.HDFS.name(); storageType = StorageBackend.StorageType.HDFS; - filePath = filePath.substring(HDFS_FILE_PREFIX.length() - 1); } else { return; } @@ -651,7 +650,9 @@ public class OutFileClause { if (storageType == StorageBackend.StorageType.S3) { S3Storage.checkS3(brokerProps); } else if (storageType == StorageBackend.StorageType.HDFS) { - HdfsStorage.checkHDFS(brokerProps); + if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) { + brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath)); + } } brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java index 6344a5ec75..e245ad6377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.UserException; import org.apache.doris.common.util.URI; -import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -49,6 +48,7 @@ import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,7 +58,7 @@ import java.util.Map; */ public class HdfsStorage extends BlobStorage { private static final Logger LOG = LogManager.getLogger(HdfsStorage.class); - private final Map<String, String> caseInsensitiveProperties; + private final Map<String, String> hdfsProperties; private final int readBufferSize = 128 << 10; // 128k private final int writeBufferSize = 128 << 10; // 128k @@ -71,30 +71,26 @@ public class HdfsStorage extends BlobStorage { * @param properties parameters to access HDFS. */ public HdfsStorage(Map<String, String> properties) { - caseInsensitiveProperties = new CaseInsensitiveMap(); + hdfsProperties = new HashMap<>(); setProperties(properties); setType(StorageBackend.StorageType.HDFS); setName(StorageBackend.StorageType.HDFS.name()); } - public static void checkHDFS(Map<String, String> properties) throws UserException { - for (String field : HdfsResource.REQUIRED_FIELDS) { - if (!properties.containsKey(field)) { - throw new UserException( - String.format("The properties of hdfs is invalid. %s are needed", field)); - } - } + public static String getFsName(String path) { + Path hdfsPath = new Path(path); + String fullPath = hdfsPath.toUri().toString(); + String filePath = hdfsPath.toUri().getPath(); + return fullPath.replace(filePath, ""); } @Override public FileSystem getFileSystem(String remotePath) throws UserException { if (dfsFileSystem == null) { - checkHDFS(caseInsensitiveProperties); - String hdfsFsName = caseInsensitiveProperties.get(HdfsResource.HADOOP_FS_NAME); - String username = caseInsensitiveProperties.get(HdfsResource.HADOOP_USER_NAME); + String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME); Configuration conf = new HdfsConfiguration(); boolean isSecurityEnabled = false; - for (Map.Entry<String, String> propEntry : caseInsensitiveProperties.entrySet()) { + for (Map.Entry<String, String> propEntry : hdfsProperties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) { @@ -106,10 +102,14 @@ public class HdfsStorage extends BlobStorage { if (isSecurityEnabled) { UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab( - caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL), - caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB)); + hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL), + hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB)); + } + if (username == null) { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf); + } else { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username); } - dfsFileSystem = FileSystem.get(java.net.URI.create(hdfsFsName), conf, username); } catch (Exception e) { LOG.error("errors while connect to " + remotePath, e); throw new UserException("errors while connect to " + remotePath, e); @@ -121,7 +121,7 @@ public class HdfsStorage extends BlobStorage { @Override public void setProperties(Map<String, String> properties) { super.setProperties(properties); - caseInsensitiveProperties.putAll(properties); + hdfsProperties.putAll(properties); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index 868f032a44..5e0a5fe874 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -27,8 +27,6 @@ import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; /** @@ -56,7 +54,6 @@ public class HdfsResource extends Resource { public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; - public static List<String> REQUIRED_FIELDS = Collections.singletonList(HADOOP_FS_NAME); @SerializedName(value = "properties") private Map<String, String> properties; @@ -75,11 +72,6 @@ public class HdfsResource extends Resource { @Override protected void setProperties(Map<String, String> properties) throws DdlException { - for (String field : REQUIRED_FIELDS) { - if (!properties.containsKey(field)) { - throw new DdlException("Missing [" + field + "] in properties."); - } - } // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read. // We should disable short circuit read if they are not both set because it will cause performance down. if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) { diff --git a/regression-test/data/external_catalog_p0/hive/test_hive_other.out b/regression-test/data/external_catalog_p0/hive/test_hive_other.out index 86af4c14ec..8d44514ed7 100644 --- a/regression-test/data/external_catalog_p0/hive/test_hive_other.out +++ b/regression-test/data/external_catalog_p0/hive/test_hive_other.out @@ -671,3 +671,11 @@ zyLjAtVdXV GrJRf8WvRR 2022-11-25 2022-11-25 zj9uWRywHa 5F8hzYcY8G 2022-11-25 2022-11-25 2022-11-25 zvs3b72ERY zorbigHkYB 2022-11-25 +-- !student -- +124 lisi 13 f abcdefh 13056781234 +123 zhangsan 12 m abcdefg 13012345678 + +-- !tvf_student -- +124 lisi 13 f abcdefh 13056781234 +123 zhangsan 12 m abcdefg 13012345678 + diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy index 82b6bb1cfc..f8401d70e9 100644 --- a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy +++ b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy @@ -77,6 +77,7 @@ suite("test_hive_other", "p0") { String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String hms_port = context.config.otherConfigs.get("hms_port") + String hdfs_port = context.config.otherConfigs.get("hdfs_port") String catalog_name = "hive_test_other" set_be_config.call() @@ -100,7 +101,51 @@ suite("test_hive_other", "p0") { // order_qt_show_tables2 """show tables""" q01() sql """refresh table `default`.table_with_vertical_line""" - order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;""" + order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;""" + + // external table + sql """switch internal""" + sql """drop database if exists external_hive_table_test""" + sql """create database external_hive_table_test""" + sql """use external_hive_table_test""" + sql """drop table if exists external_hive_student""" + + sql """ + create external table `external_hive_student` ( + `id` varchar(100), + `name` varchar(100), + `age` int, + `gender` varchar(100), + `addr` varchar(100), + `phone` varchar(100) + ) ENGINE=HIVE + PROPERTIES + ( + 'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}', + 'database' = 'default', + 'table' = 'student' + ); + """ + qt_student """select * from external_hive_student order by name;""" + + // read external table + String csv_output_dir = UUID.randomUUID().toString() + sql """ + select * from external_hive_student + into outfile "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_" + format as csv_with_names + properties ( + "column_separator" = ",", + "line_delimiter" = "\n" + ); + """ + qt_tvf_student """ + select * from hdfs ( + "format" = "csv_with_names", + "fs.defaultFS" = "hdfs://127.0.0.1:${hdfs_port}", + "uri" = "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_*" + ) order by name; + """ } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org