This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit a0256f3ebd8412a51004d5beba4dfa228c1658c9 Author: morningman <morning...@163.com> AuthorDate: Wed Jun 15 16:06:29 2022 +0800 [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579) At present, Doris can only access the hadoop cluster with kerberos authentication enabled by broker, but Doris BE itself does not supports access to a kerberos-authenticated HDFS file. This PR hope solve the problem. When create hive external table, users just specify following properties to access the hdfs data with kerberos authentication enabled: ```sql CREATE EXTERNAL TABLE t_hive ( k1 int NOT NULL COMMENT "", k2 char(10) NOT NULL COMMENT "", k3 datetime NOT NULL COMMENT "", k5 varchar(20) NOT NULL COMMENT "", k6 double NOT NULL COMMENT "" ) ENGINE=HIVE COMMENT "HIVE" PROPERTIES ( 'hive.metastore.uris' = 'thrift://192.168.0.1:9083', 'database' = 'hive_db', 'table' = 'hive_table', 'dfs.nameservices'='hacluster', 'dfs.ha.namenodes.hacluster'='n1,n2', 'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020', 'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020', 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'dfs.namenode.kerberos.principal'='hadoop/_h...@realm.com' 'hadoop.security.authentication'='kerberos', 'hadoop.kerberos.principal'='doris_t...@realm.com', 'hadoop.kerberos.keytab'='/path/to/doris_test.keytab' ); ``` If you want to `select into outfile` to HDFS that kerberos authentication enable, you can refer to the following SQL statement: ```sql select * from test into outfile "hdfs://tmp/outfile1" format as csv properties ( 'fs.defaultFS'='hdfs://hacluster/', 'dfs.nameservices'='hacluster', 'dfs.ha.namenodes.hacluster'='n1,n2', 'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020', 'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020', 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'dfs.namenode.kerberos.principal'='hadoop/_h...@realm.com' 'hadoop.security.authentication'='kerberos', 'hadoop.kerberos.principal'='doris_t...@realm.com', 'hadoop.kerberos.keytab'='/path/to/doris_test.keytab' ); --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/hdfs_builder.cpp | 78 ++++++++++++++++++++ be/src/exec/hdfs_builder.h | 47 ++++++++++++ be/src/exec/hdfs_file_reader.cpp | 31 ++------ be/src/exec/hdfs_file_reader.h | 6 +- be/src/exec/hdfs_writer.cpp | 85 +++++++++------------- be/src/exec/hdfs_writer.h | 12 ++- .../org/apache/doris/analysis/OutFileClause.java | 14 +++- .../java/org/apache/doris/catalog/AuthType.java | 60 +++++++++++++++ .../doris/catalog/HiveMetaStoreClientHelper.java | 13 ++++ .../java/org/apache/doris/catalog/HiveTable.java | 48 ++++++++++-- .../org/apache/doris/common/util/BrokerUtil.java | 57 +++++++++------ .../org/apache/doris/planner/BrokerScanNode.java | 3 + .../org/apache/doris/planner/HiveScanNode.java | 2 +- .../org/apache/doris/catalog/HiveTableTest.java | 4 +- gensrc/thrift/PlanNodes.thrift | 7 +- 16 files changed, 340 insertions(+), 128 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 7940c8ac61..e09169c000 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -105,6 +105,7 @@ set(EXEC_FILES hdfs_reader_writer.cpp hdfs_file_reader.cpp hdfs_writer.cpp + hdfs_builder.cpp ) if (WITH_MYSQL) diff --git a/be/src/exec/hdfs_builder.cpp b/be/src/exec/hdfs_builder.cpp new file mode 100644 index 0000000000..5940958050 --- /dev/null +++ b/be/src/exec/hdfs_builder.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/hdfs_builder.h" + +#include <fmt/format.h> + +#include <fstream> + +#include "agent/utils.h" +#include "common/logging.h" +#include "util/uid_util.h" +#include "util/url_coding.h" +namespace doris { + +const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_"; + +Status HDFSCommonBuilder::run_kinit() { + if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) { + return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab"); + } + std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string(); + fmt::memory_buffer kinit_command; + fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path, hdfs_kerberos_keytab, + hdfs_kerberos_principal); + VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command); + std::string msg; + AgentUtils util; + bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg); + if (!rc) { + return Status::InternalError("Kinit failed, errMsg: " + msg); + } + hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str()); + return Status::OK(); +} + +HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) { + HDFSCommonBuilder builder; + hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str()); + // set hdfs user + if (hdfsParams.__isset.user) { + hdfsBuilderSetUserName(builder.get(), hdfsParams.user.c_str()); + } + // set kerberos conf + if (hdfsParams.__isset.hdfs_kerberos_principal) { + builder.need_kinit = true; + builder.hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal; + hdfsBuilderSetPrincipal(builder.get(), hdfsParams.hdfs_kerberos_principal.c_str()); + } + if (hdfsParams.__isset.hdfs_kerberos_keytab) { + builder.need_kinit = true; + builder.hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab; + } + // set other conf + if (hdfsParams.__isset.hdfs_conf) { + for (const THdfsConf& conf : hdfsParams.hdfs_conf) { + hdfsBuilderConfSetStr(builder.get(), conf.key.c_str(), conf.value.c_str()); + } + } + + return builder; +} + +} // namespace doris diff --git a/be/src/exec/hdfs_builder.h b/be/src/exec/hdfs_builder.h new file mode 100644 index 0000000000..70ac723f1c --- /dev/null +++ b/be/src/exec/hdfs_builder.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <hdfs/hdfs.h> + +#include "gen_cpp/PlanNodes_types.h" +#include "exec/file_reader.h" + +namespace doris { + +class HDFSCommonBuilder { + friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams); + +public: + HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {}; + ~HDFSCommonBuilder() { hdfsFreeBuilder(hdfs_builder); }; + + hdfsBuilder* get() { return hdfs_builder; }; + bool is_need_kinit() { return need_kinit; }; + Status run_kinit(); + +private: + hdfsBuilder* hdfs_builder; + bool need_kinit {false}; + std::string hdfs_kerberos_keytab; + std::string hdfs_kerberos_principal; +}; + +HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams); + +} // namespace doris diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp index a047df6236..b042018e55 100644 --- a/be/src/exec/hdfs_file_reader.cpp +++ b/be/src/exec/hdfs_file_reader.cpp @@ -31,7 +31,8 @@ HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const std::string _current_offset(start_offset), _file_size(-1), _hdfs_fs(nullptr), - _hdfs_file(nullptr) { + _hdfs_file(nullptr), + _builder(createHDFSBuilder(_hdfs_params)) { _namenode = _hdfs_params.fs_name; } @@ -40,32 +41,10 @@ HdfsFileReader::~HdfsFileReader() { } Status HdfsFileReader::connect() { - hdfsBuilder* hdfs_builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str()); - // set hdfs user - if (_hdfs_params.__isset.user) { - hdfsBuilderSetUserName(hdfs_builder, _hdfs_params.user.c_str()); + if (_builder.is_need_kinit()) { + RETURN_IF_ERROR(_builder.run_kinit()); } - // set kerberos conf - if (_hdfs_params.__isset.kerb_principal) { - hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str()); - } - if (_hdfs_params.__isset.kerb_ticket_cache_path) { - hdfsBuilderSetKerbTicketCachePath(hdfs_builder, - _hdfs_params.kerb_ticket_cache_path.c_str()); - } - // set token - if (_hdfs_params.__isset.token) { - hdfsBuilderSetToken(hdfs_builder, _hdfs_params.token.c_str()); - } - // set other conf - if (_hdfs_params.__isset.hdfs_conf) { - for (const THdfsConf& conf : _hdfs_params.hdfs_conf) { - hdfsBuilderConfSetStr(hdfs_builder, conf.key.c_str(), conf.value.c_str()); - } - } - _hdfs_fs = hdfsBuilderConnect(hdfs_builder); - hdfsFreeBuilder(hdfs_builder); + _hdfs_fs = hdfsBuilderConnect(_builder.get()); if (_hdfs_fs == nullptr) { std::stringstream ss; ss << "connect failed. " << _namenode; diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/exec/hdfs_file_reader.h index d4430de3e2..e81af8a186 100644 --- a/be/src/exec/hdfs_file_reader.h +++ b/be/src/exec/hdfs_file_reader.h @@ -17,10 +17,9 @@ #pragma once -#include <hdfs/hdfs.h> - -#include "exec/file_reader.h" #include "gen_cpp/PlanNodes_types.h" +#include "exec/file_reader.h" +#include "exec/hdfs_builder.h" namespace doris { @@ -56,6 +55,7 @@ private: int64_t _file_size; hdfsFS _hdfs_fs; hdfsFile _hdfs_file; + HDFSCommonBuilder _builder; }; } // namespace doris diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/exec/hdfs_writer.cpp index 8aaae7e78e..b45fbe4449 100644 --- a/be/src/exec/hdfs_writer.cpp +++ b/be/src/exec/hdfs_writer.cpp @@ -24,23 +24,32 @@ namespace doris { const static std::string FS_KEY = "fs.defaultFS"; -const static std::string USER = "hdfs_user"; -const static std::string KERBEROS_PRINCIPAL = "kerberos_principal"; -const static std::string KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path"; +const static std::string USER = "hadoop.username"; +const static std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; +const static std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; const static std::string TOKEN = "token"; HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path) : _properties(properties), _path(path), - _hdfs_fs(nullptr) { - _parse_properties(_properties); -} + _hdfs_fs(nullptr), + _hdfs_params(_parse_properties(_properties)), + _builder(createHDFSBuilder(_hdfs_params)) {} HDFSWriter::~HDFSWriter() { close(); } Status HDFSWriter::open() { + if (_namenode.empty()) { + LOG(WARNING) << "hdfs properties is incorrect."; + return Status::InternalError("hdfs properties is incorrect"); + } + // if the format of _path is hdfs://ip:port/path, replace it to /path. + // path like hdfs://ip:port/path can't be used by libhdfs3. + if (_path.find(_namenode) != _path.npos) { + _path = _path.substr(_namenode.size()); + } RETURN_IF_ERROR(_connect()); if (_hdfs_fs == nullptr) { return Status::InternalError("HDFS writer open without client"); @@ -130,31 +139,10 @@ Status HDFSWriter::close() { } Status HDFSWriter::_connect() { - hdfsBuilder* hdfs_builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str()); - // set hdfs user - if (!_user.empty()) { - hdfsBuilderSetUserName(hdfs_builder, _user.c_str()); - } - // set kerberos conf - if (!_kerb_principal.empty()) { - hdfsBuilderSetPrincipal(hdfs_builder, _kerb_principal.c_str()); - } - if (!_kerb_ticket_cache_path.empty()) { - hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _kerb_ticket_cache_path.c_str()); - } - // set token - if (!_token.empty()) { - hdfsBuilderSetToken(hdfs_builder, _token.c_str()); - } - // set other conf - if (!_properties.empty()) { - std::map<std::string, std::string>::iterator iter; - for (iter = _properties.begin(); iter != _properties.end(); ++iter) { - hdfsBuilderConfSetStr(hdfs_builder, iter->first.c_str(), iter->second.c_str()); - } + if (_builder.is_need_kinit()) { + RETURN_IF_ERROR(_builder.run_kinit()); } - _hdfs_fs = hdfsBuilderConnect(hdfs_builder); + _hdfs_fs = hdfsBuilderConnect(_builder.get()); if (_hdfs_fs == nullptr) { std::stringstream ss; ss << "connect failed. namenode:" << _namenode; @@ -163,41 +151,34 @@ Status HDFSWriter::_connect() { return Status::OK(); } -Status HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) { +THdfsParams HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) { std::map<std::string, std::string>::iterator iter; + std::vector<THdfsConf> hdfs_configs; + THdfsParams hdfsParams; for (iter = prop.begin(); iter != prop.end();) { if (iter->first.compare(FS_KEY) == 0) { _namenode = iter->second; + hdfsParams.__set_fs_name(_namenode); iter = prop.erase(iter); } else if (iter->first.compare(USER) == 0) { - _user = iter->second; + hdfsParams.__set_user(iter->second); iter = prop.erase(iter); } else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) { - _kerb_principal = iter->second; + hdfsParams.__set_hdfs_kerberos_principal(iter->second); iter = prop.erase(iter); - } else if (iter->first.compare(KERB_TICKET_CACHE_PATH) == 0) { - _kerb_ticket_cache_path = iter->second; - iter = prop.erase(iter); - } else if (iter->first.compare(TOKEN) == 0) { - _token = iter->second; + } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) { + hdfsParams.__set_hdfs_kerberos_keytab(iter->second); iter = prop.erase(iter); } else { - ++iter; + THdfsConf item; + item.key = iter->first; + item.value = iter->second; + hdfs_configs.push_back(item); + iter = prop.erase(iter); } } - - if (_namenode.empty()) { - LOG(WARNING) << "hdfs properties is incorrect."; - return Status::InternalError("hdfs properties is incorrect"); - } - - // if the format of _path is hdfs://ip:port/path, replace it to /path. - // path like hdfs://ip:port/path can't be used by libhdfs3. - if (_path.find(_namenode) != _path.npos) { - _path = _path.substr(_namenode.size()); - } - - return Status::OK(); + hdfsParams.__set_hdfs_conf(hdfs_configs); + return hdfsParams; } }// end namespace doris diff --git a/be/src/exec/hdfs_writer.h b/be/src/exec/hdfs_writer.h index a3f17ec166..2b99bacb5e 100644 --- a/be/src/exec/hdfs_writer.h +++ b/be/src/exec/hdfs_writer.h @@ -17,12 +17,12 @@ #pragma once -#include <hdfs/hdfs.h> - #include <map> #include <string> +#include "gen_cpp/PlanNodes_types.h" #include "exec/file_writer.h" +#include "exec/hdfs_builder.h" namespace doris { class HDFSWriter : public FileWriter { @@ -40,18 +40,16 @@ public: private: Status _connect(); - Status _parse_properties(std::map<std::string, std::string>& prop); + THdfsParams _parse_properties(std::map<std::string, std::string>& prop); std::map<std::string, std::string> _properties; - std::string _user = ""; std::string _namenode = ""; std::string _path = ""; - std::string _kerb_principal = ""; - std::string _kerb_ticket_cache_path = ""; - std::string _token = ""; hdfsFS _hdfs_fs = nullptr; hdfsFile _hdfs_file = nullptr; bool _closed = false; + THdfsParams _hdfs_params; + HDFSCommonBuilder _builder; }; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index c82cd1ab62..1817bf7d2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.qe.ConnectContext; @@ -87,7 +88,8 @@ public class OutFileClause { public static final String LOCAL_FILE_PREFIX = "file:///"; private static final String S3_FILE_PREFIX = "S3://"; private static final String HDFS_FILE_PREFIX = "hdfs://"; - private static final String HDFS_PROP_PREFIX = "hdfs."; + private static final String HADOOP_FS_PROP_PREFIX = "dfs."; + private static final String HADOOP_PROP_PREFIX = "hadoop."; private static final String BROKER_PROP_PREFIX = "broker."; private static final String PROP_BROKER_NAME = "broker.name"; private static final String PROP_COLUMN_SEPARATOR = "column_separator"; @@ -416,9 +418,13 @@ public class OutFileClause { } else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) { brokerProps.put(entry.getKey(), entry.getValue()); processedPropKeys.add(entry.getKey()); - } else if (entry.getKey().startsWith(HDFS_PROP_PREFIX) - && storageType == StorageBackend.StorageType.HDFS) { - brokerProps.put(entry.getKey().substring(HDFS_PROP_PREFIX.length()), entry.getValue()); + } else if (entry.getKey().contains(BrokerUtil.HADOOP_FS_NAME) + && storageType == StorageBackend.StorageType.HDFS) { + brokerProps.put(entry.getKey(), entry.getValue()); + processedPropKeys.add(entry.getKey()); + } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX) || entry.getKey().startsWith(HADOOP_PROP_PREFIX)) + && storageType == StorageBackend.StorageType.HDFS) { + brokerProps.put(entry.getKey(), entry.getValue()); processedPropKeys.add(entry.getKey()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java new file mode 100644 index 0000000000..c0c97530a0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +/** + * Define different auth type for external table such as hive/iceberg, + * so that BE could call secured under fileStorageSystem (enable kerberos) + */ +public enum AuthType { + SIMPLE(0, "simple"), + KERBEROS(1, "kerberos"); + + private int code; + private String desc; + + AuthType(int code, String desc) { + this.code = code; + this.desc = desc; + } + + public static boolean isSupportedAuthType(String authType) { + for (AuthType auth : values()) { + if (auth.getDesc().equals(authType)) { + return true; + } + } + return false; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index c378eecf24..f192df298b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -31,6 +31,7 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExprOpcode; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -245,14 +247,25 @@ public class HiveMetaStoreClientHelper { private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException { List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); Configuration configuration = new Configuration(false); + boolean isSecurityEnabled = false; for (Map.Entry<String, String> entry : properties.entrySet()) { if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { configuration.set(entry.getKey(), entry.getValue()); } + if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) + && entry.getValue().equals(AuthType.KERBEROS.getDesc())) { + isSecurityEnabled = true; + } } String location = table.getSd().getLocation(); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); try { + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(configuration); + // login user from keytab + UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), + properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); + } FileSystem fileSystem = path.getFileSystem(configuration); iterators.add(fileSystem.listLocatedStatus(path)); } catch (IOException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index d418e47307..19be317f12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -19,6 +19,7 @@ package org.apache.doris.catalog; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -39,6 +40,7 @@ import java.util.Map; */ public class HiveTable extends Table { private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table"; + private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it"; private static final String HIVE_DB = "database"; private static final String HIVE_TABLE = "table"; @@ -77,7 +79,7 @@ public class HiveTable extends Table { private void validate(Map<String, String> properties) throws DdlException { if (properties == null) { throw new DdlException("Please set properties of hive table, " - + "they are: database, table and 'hive.metastore.uris'"); + + "they are: database, table and 'hive.metastore.uris'"); } Map<String, String> copiedProps = Maps.newHashMap(properties); @@ -94,14 +96,48 @@ public class HiveTable extends Table { copiedProps.remove(HIVE_TABLE); // check hive properties - // hive.metastore.uris - String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS); - if (Strings.isNullOrEmpty(hiveMetastoreUris)) { + // hive.metastore.uris + String hiveMetaStoreUris = copiedProps.get(HIVE_METASTORE_URIS); + if (Strings.isNullOrEmpty(hiveMetaStoreUris)) { throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS)); } copiedProps.remove(HIVE_METASTORE_URIS); - hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris); + hiveProperties.put(HIVE_METASTORE_URIS, hiveMetaStoreUris); + // check auth type + String authType = copiedProps.get(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION); + if (Strings.isNullOrEmpty(authType)) { + authType = AuthType.SIMPLE.getDesc(); + } + if (!AuthType.isSupportedAuthType(authType)) { + throw new DdlException(String.format(PROPERTY_ERROR_MSG, BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType)); + } + copiedProps.remove(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION); + hiveProperties.put(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType); + + if (AuthType.KERBEROS.getDesc().equals(authType)) { + // check principal + String principal = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL); + if (Strings.isNullOrEmpty(principal)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL)); + } + hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, principal); + copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL); + // check keytab + String keytabPath = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB); + if (Strings.isNullOrEmpty(keytabPath)) { + throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_KEYTAB, BrokerUtil.HADOOP_KERBEROS_KEYTAB)); + } + if (!Strings.isNullOrEmpty(keytabPath)) { + hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_KEYTAB, keytabPath); + copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_KEYTAB); + } + } + String HDFSUserName = copiedProps.get(BrokerUtil.HADOOP_USER_NAME); + if (!Strings.isNullOrEmpty(HDFSUserName)) { + hiveProperties.put(BrokerUtil.HADOOP_USER_NAME, HDFSUserName); + copiedProps.remove(BrokerUtil.HADOOP_USER_NAME); + } if (!copiedProps.isEmpty()) { Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator(); while(iter.hasNext()) { @@ -148,7 +184,7 @@ public class HiveTable extends Table { public TTableDescriptor toThrift() { THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties()); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, - fullSchema.size(), 0, getName(), ""); + fullSchema.size(), 0, getName(), ""); tTableDescriptor.setHiveTable(tHiveTable); return tTableDescriptor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 9e170e47b7..c10256e69b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.RemoteFile; import org.apache.doris.backup.S3Storage; import org.apache.doris.backup.Status; +import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; @@ -64,6 +65,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -82,27 +84,26 @@ import java.util.Map; public class BrokerUtil { private static final Logger LOG = LogManager.getLogger(BrokerUtil.class); - private static int READ_BUFFER_SIZE_B = 1024 * 1024; - private static String HDFS_FS_KEY = "fs.defaultFS"; - private static String HDFS_USER_KEY = "hdfs_user"; - private static String HDFS_KERB_PRINCIPAL = "kerb_principal"; - private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path"; - private static String HDFS_KERB_TOKEN = "kerb_token"; + private static final int READ_BUFFER_SIZE_B = 1024 * 1024; + public static String HADOOP_FS_NAME = "fs.defaultFS"; + // simple or kerberos + public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; + public static String HADOOP_USER_NAME = "hadoop.username"; + public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; + public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) { rangeDesc.setHdfsParams(new THdfsParams()); rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>()); for (Map.Entry<String, String> property : properties.entrySet()) { - if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) { + if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { rangeDesc.hdfs_params.setFsName(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) { + } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) { rangeDesc.hdfs_params.setUser(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) { - rangeDesc.hdfs_params.setKerbPrincipal(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) { - rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) { - rangeDesc.hdfs_params.setToken(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) { + rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) { + rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue()); } else { THdfsConf hdfsConf = new THdfsConf(); hdfsConf.setKey(property.getKey()); @@ -171,27 +172,35 @@ public class BrokerUtil { } } } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) { - if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY) - || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) { + if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME) + || !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) { throw new UserException(String.format( - "The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY)); + "The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME, HADOOP_USER_NAME)); } - String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY); - String user = brokerDesc.getProperties().get(HDFS_USER_KEY); + String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME); + String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME); Configuration conf = new Configuration(); + boolean isSecurityEnabled = false; for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) { - if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) { - continue; - } conf.set(propEntry.getKey(), propEntry.getValue()); + if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) + && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) { + isSecurityEnabled = true; + } } try { - FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user); + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab( + brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), + brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); + } + FileSystem fs = FileSystem.get(new URI(fsName), conf, userName); FileStatus[] statusList = fs.globStatus(new Path(path)); for (FileStatus status : statusList) { if (status.isFile()) { fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(), - status.isDirectory(), status.getLen(), status.isFile())); + status.isDirectory(), status.getLen(), status.isFile())); } } } catch (IOException | InterruptedException | URISyntaxException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 417b91e145..187646e1d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -68,6 +68,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index a67f5394ca..711e63e695 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -179,7 +179,7 @@ public class HiveScanNode extends BrokerScanNode { } List<TBrokerFileStatus> fileStatuses = new ArrayList<>(); this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate, - fileStatuses, remoteHiveTable); + fileStatuses, remoteHiveTable); fileStatusesList.add(fileStatuses); filesAdded += fileStatuses.size(); for (TBrokerFileStatus fstatus : fileStatuses) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java index 00b42a4ec0..c7dad72037 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.DdlException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -53,7 +54,8 @@ public class HiveTableTest { public void testNormal() throws DdlException { HiveTable table = new HiveTable(1000, "hive_table", columns, properties); Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable()); - Assert.assertEquals(1, table.getHiveProperties().size()); + // HiveProperties={hadoop.security.authentication=simple, hive.metastore.uris=thrift://127.0.0.1:9083} + Assert.assertEquals(2, table.getHiveProperties().size()); } @Test(expected = DdlException.class) diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c46b0f3c24..9d9124b056 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -118,10 +118,9 @@ struct THdfsConf { struct THdfsParams { 1: optional string fs_name 2: optional string user - 3: optional string kerb_principal - 4: optional string kerb_ticket_cache_path - 5: optional string token - 6: optional list<THdfsConf> hdfs_conf + 3: optional string hdfs_kerberos_principal + 4: optional string hdfs_kerberos_keytab + 5: optional list<THdfsConf> hdfs_conf } // One broker range information. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org