This is an automated email from the ASF dual-hosted git repository. hellostephen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a5fd82400a0 [fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#37301) a5fd82400a0 is described below commit a5fd82400a0d40560c0f9c85da987ec296e9d1df Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Thu Jul 18 16:04:29 2024 +0800 [fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#37301) ## Proposed changes optimize kerberos ugi login: 1. support authentication framework for external table 2. cache ugi to avoid conflicts when set configuration for ugi login 3. do ugi login just on creating catalog, which reduces the ugi create times 4. only simple authentication will use getloginUser, which avoids conflicts between simple and kerberos authentication <!--Describe your changes.--> --- .../kerberos/common/conf/doris-krb5.conf | 2 +- .../authentication/AuthenticationConfig.java | 2 + ...icationConfig.java => HadoopAuthenticator.java} | 32 ++-- .../HadoopKerberosAuthenticator.java | 163 +++++++++++++++++++++ .../authentication/HadoopSimpleAuthenticator.java | 47 ++++++ .../common/security/authentication/HadoopUGI.java | 89 +++-------- ....java => ImpersonatingHadoopAuthenticator.java} | 29 ++-- .../KerberosAuthenticationConfig.java | 3 + .../doris/datasource/hive/HMSCachedClient.java | 5 + .../doris/datasource/hive/HMSExternalCatalog.java | 12 +- .../doris/datasource/hive/HiveMetadataOps.java | 7 +- .../datasource/hive/ThriftHMSCachedClient.java | 12 +- .../apache/doris/fs/remote/RemoteFileSystem.java | 14 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 45 ++++-- .../kerberos/test_two_hive_kerberos.groovy | 33 +++++ 15 files changed, 380 insertions(+), 115 deletions(-) diff --git a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf index 7624b94e6ad..36547b8f89d 100644 --- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf +++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf @@ -24,7 +24,7 @@ default_realm = LABS.TERADATA.COM dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 24h + ticket_lifetime = 5s # this setting is causing a Message stream modified (41) error when talking to KDC running on CentOS 7: https://stackoverflow.com/a/60978520 # renew_lifetime = 7d forwardable = true diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index 32a27b2263a..875ae4542e1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -26,6 +26,7 @@ public abstract class AuthenticationConfig { public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; + public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; /** * @return true if the config is valid, otherwise false. @@ -57,6 +58,7 @@ public abstract class AuthenticationConfig { krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey)); krbConfig.setKerberosKeytab(conf.get(krbKeytabKey)); krbConfig.setConf(conf); + krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false"))); return krbConfig; } else { // AuthType.SIMPLE diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java similarity index 51% copy from fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java copy to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index 722cd0352b7..c3cab5f410b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -17,18 +17,28 @@ package org.apache.doris.common.security.authentication; -import lombok.Data; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; -@Data -public class KerberosAuthenticationConfig extends AuthenticationConfig { - private String kerberosPrincipal; - private String kerberosKeytab; - private Configuration conf; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; - @Override - public boolean isValid() { - return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab); +public interface HadoopAuthenticator { + + UserGroupInformation getUGI() throws IOException; + + default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException { + try { + return getUGI().doAs(action); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) { + if (config instanceof KerberosAuthenticationConfig) { + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config); + } else { + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config); + } } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java new file mode 100644 index 00000000000..14dace68b24 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.base.authentication.KerberosTicketUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; + +public class HadoopKerberosAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopKerberosAuthenticator.class); + private final KerberosAuthenticationConfig config; + private Subject subject; + private long nextRefreshTime; + private UserGroupInformation ugi; + + public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) { + this.config = config; + } + + public static void initializeAuthConfig(Configuration hadoopConf) { + hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + synchronized (HadoopKerberosAuthenticator.class) { + // avoid other catalog set conf at the same time + UserGroupInformation.setConfiguration(hadoopConf); + } + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + subject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), config.isPrintDebugLog()); + ugi = Objects.requireNonNull(login(subject), "login result is null"); + return ugi; + } + if (nextRefreshTime < System.currentTimeMillis()) { + long lastRefreshTime = nextRefreshTime; + Subject existingSubject = subject; + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(subject); + LOG.debug("Current ticket expired time is {}", lastTicketEndTime); + } + // renew subject + Subject newSubject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), + config.isPrintDebugLog()); + Objects.requireNonNull(login(newSubject), "re-login result is null"); + // modify UGI instead of returning new UGI + existingSubject.getPrincipals().addAll(newSubject.getPrincipals()); + Set<Object> privateCredentials = existingSubject.getPrivateCredentials(); + // clear the old credentials + synchronized (privateCredentials) { + privateCredentials.clear(); + privateCredentials.addAll(newSubject.getPrivateCredentials()); + } + Set<Object> publicCredentials = existingSubject.getPublicCredentials(); + synchronized (publicCredentials) { + publicCredentials.clear(); + publicCredentials.addAll(newSubject.getPublicCredentials()); + } + nextRefreshTime = calculateNextRefreshTime(newSubject); + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(newSubject); + LOG.debug("Next ticket expired time is {}", lastTicketEndTime); + LOG.debug("Refresh kerberos ticket succeeded, last time is {}, next time is {}", + lastRefreshTime, nextRefreshTime); + } + } + return ugi; + } + + private UserGroupInformation login(Subject subject) throws IOException { + // login and get ugi when catalog is initialized + initializeAuthConfig(config.getConf()); + String principal = config.getKerberosPrincipal(); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by kerberos authentication with principal: {}", principal); + } + return UserGroupInformation.getUGIFromSubject(subject); + } + + private static long calculateNextRefreshTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); + return KerberosTicketUtils.getRefreshTime(tgtTicket); + } + + private static Date getTicketEndTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); + return tgtTicket.getEndTime(); + } + + private static Subject getSubject(String keytab, String principal, boolean printDebugLog) { + Subject subject = new Subject(false, ImmutableSet.of(new KerberosPrincipal(principal)), + Collections.emptySet(), Collections.emptySet()); + javax.security.auth.login.Configuration conf = getConfiguration(keytab, principal, printDebugLog); + try { + LoginContext loginContext = new LoginContext("", subject, null, conf); + loginContext.login(); + return loginContext.getSubject(); + } catch (LoginException e) { + throw new RuntimeException(e); + } + } + + private static javax.security.auth.login.Configuration getConfiguration(String keytab, String principal, + boolean printDebugLog) { + return new javax.security.auth.login.Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() + .put("doNotPrompt", "true") + .put("isInitiator", "true") + .put("useKeyTab", "true") + .put("storeKey", "true") + .put("keyTab", keytab) + .put("principal", principal); + if (printDebugLog) { + builder.put("debug", "true"); + } + Map<String, String> options = builder.build(); + return new AppConfigurationEntry[]{ + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + }; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java new file mode 100644 index 00000000000..fbe0d0aba7d --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HadoopSimpleAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopSimpleAuthenticator.class); + private final UserGroupInformation ugi; + + public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) { + String hadoopUserName = config.getUsername(); + if (hadoopUserName == null) { + hadoopUserName = "hadoop"; + config.setUsername(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); + } + } + ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); + } + } + + @Override + public UserGroupInformation getUGI() { + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java index 1a86b9e327a..d04d772728b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -18,8 +18,6 @@ package org.apache.doris.common.security.authentication; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +25,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.security.PrivilegedExceptionAction; +@Deprecated public class HadoopUGI { private static final Logger LOG = LogManager.getLogger(HadoopUGI.class); @@ -39,82 +38,30 @@ public class HadoopUGI { if (config == null || !config.isValid()) { return null; } - UserGroupInformation ugi; if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - String principal = krbConfig.getKerberosPrincipal(); try { - // login hadoop with keytab and try checking TGT - ugi = UserGroupInformation.getLoginUser(); - LOG.debug("Current login user: {}", ugi.getUserName()); - if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { - // if the current user is logged by kerberos and is the same user - // just use checkTGTAndReloginFromKeytab because this method will only relogin - // when the TGT is expired or is close to expiry - ugi.checkTGTAndReloginFromKeytab(); - return ugi; + // TODO: remove after iceberg and hudi kerberos test case pass + try { + // login hadoop with keytab and try checking TGT + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + LOG.debug("Current login user: {}", ugi.getUserName()); + String principal = ((KerberosAuthenticationConfig) config).getKerberosPrincipal(); + if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { + // if the current user is logged by kerberos and is the same user + // just use checkTGTAndReloginFromKeytab because this method will only relogin + // when the TGT is expired or is close to expiry + ugi.checkTGTAndReloginFromKeytab(); + return ugi; + } + } catch (IOException e) { + LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); - } - try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, krbConfig.getKerberosKeytab()); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by kerberos authentication with principal: {}", principal); - return ugi; + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI(); } catch (IOException e) { throw new RuntimeException(e); } } else { - String hadoopUserName = ((SimpleAuthenticationConfig) config).getUsername(); - if (hadoopUserName == null) { - hadoopUserName = "hadoop"; - ((SimpleAuthenticationConfig) config).setUsername(hadoopUserName); - LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop"); - } - - try { - ugi = UserGroupInformation.getLoginUser(); - if (ugi.getUserName().equals(hadoopUserName)) { - return ugi; - } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with simple, do login immediately.", e); - } - - ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); - return ugi; - } - } - - /** - * use for HMSExternalCatalog to login - * @param config auth config - */ - public static void tryKrbLogin(String catalogName, AuthenticationConfig config) { - if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - try { - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - /** - * Because metastore client is created by using - * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} - * it will relogin when TGT is expired, so we don't need to relogin manually. - */ - UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(), - krbConfig.getKerberosKeytab()); - } catch (IOException e) { - throw new RuntimeException("login with kerberos auth failed for catalog: " + catalogName, e); - } + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config).getUGI(); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java similarity index 54% copy from fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java copy to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java index 722cd0352b7..10e42f4bc67 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java @@ -17,18 +17,27 @@ package org.apache.doris.common.security.authentication; -import lombok.Data; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; -@Data -public class KerberosAuthenticationConfig extends AuthenticationConfig { - private String kerberosPrincipal; - private String kerberosKeytab; - private Configuration conf; +import java.io.IOException; +import java.util.Objects; + +public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator { + + private final HadoopAuthenticator delegate; + private final String username; + private UserGroupInformation ugi; + + public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, String username) { + this.delegate = Objects.requireNonNull(delegate); + this.username = Objects.requireNonNull(username); + } @Override - public boolean isValid() { - return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab); + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + ugi = UserGroupInformation.createProxyUser(username, delegate.getUGI()); + } + return ugi; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java index 722cd0352b7..adf76274386 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java @@ -18,14 +18,17 @@ package org.apache.doris.common.security.authentication; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +@EqualsAndHashCode(callSuper = true) @Data public class KerberosAuthenticationConfig extends AuthenticationConfig { private String kerberosPrincipal; private String kerberosKeytab; private Configuration conf; + private boolean printDebugLog; @Override public boolean isValid() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 1ac77972053..a5e0eefb348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -113,6 +114,10 @@ public interface HMSCachedClient { void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData); + default void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + // Ignored by default + } + /** * close the connection, eg, to hms */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 91192b63c24..a22eacaf1e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -24,7 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -40,7 +40,9 @@ import org.apache.doris.fs.FileSystemProviderImpl; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.transaction.TransactionManagerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; + @Getter + private HadoopAuthenticator authenticator; + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); } @@ -81,6 +86,8 @@ public class HMSExternalCatalog extends ExternalCatalog { super(catalogId, name, InitCatalogLog.Type.HMS, comment); props = PropertyConverter.convertToMetaProperties(props); catalogProperty = new CatalogProperty(resource, props); + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); } @Override @@ -148,9 +155,6 @@ public class HMSExternalCatalog extends ExternalCatalog { } hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), String.valueOf(Config.hive_metastore_client_timeout_second)); - HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf, - AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 7267297c93e..f438f5e1782 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -32,6 +32,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; @@ -60,11 +61,14 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static final int MIN_CLIENT_POOL_SIZE = 8; private final HMSCachedClient client; private final HMSExternalCatalog catalog; + private HadoopAuthenticator hadoopAuthenticator; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { this(catalog, createCachedClient(hiveConf, Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig)); + hadoopAuthenticator = catalog.getAuthenticator(); + client.setHadoopAuthenticator(hadoopAuthenticator); } @VisibleForTesting @@ -84,7 +88,8 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, JdbcClientConfig jdbcClientConfig) { if (hiveConf != null) { - return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); + ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); + return client; } Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null"); String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 55d8ffc2e02..7632101d7ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.common.Config; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -92,6 +93,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { private boolean isClosed = false; private final int poolSize; private final HiveConf hiveConf; + private HadoopAuthenticator hadoopAuthenticator; public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) { Preconditions.checkArgument(poolSize > 0, poolSize); @@ -104,6 +106,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient { this.isClosed = false; } + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + @Override public void close() { synchronized (clientPool) { @@ -678,7 +684,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient { } private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) { - return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); + try { + return hadoopAuthenticator.doAs(action); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 68de3a8fdef..3cb8a036c2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -56,7 +57,7 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { try { org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); Path locatedPath = new Path(remotePath); - RemoteIterator<LocatedFileStatus> locatedFiles = fileSystem.listFiles(locatedPath, recursive); + RemoteIterator<LocatedFileStatus> locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath); while (locatedFiles.hasNext()) { LocatedFileStatus fileStatus = locatedFiles.next(); RemoteFile location = new RemoteFile( @@ -72,11 +73,16 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { return Status.OK; } + protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return fileSystem.listFiles(locatedPath, recursive); + } + @Override public Status listDirectories(String remotePath, Set<String> result) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + FileStatus[] fileStatuses = getFileStatuses(remotePath, fileSystem); result.addAll( Arrays.stream(fileStatuses) .filter(FileStatus::isDirectory) @@ -88,6 +94,10 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { return Status.OK; } + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return fileSystem.listStatus(new Path(remotePath)); + } + @Override public Status renameDir(String origFilePath, String destFilePath, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 5532f1187fe..59fbd73bda7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -21,7 +21,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.URI; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,6 +61,7 @@ public class DFSFileSystem extends RemoteFileSystem { public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); private HDFSFileOperations operations = null; + private HadoopAuthenticator authenticator = null; public DFSFileSystem(Map<String, String> properties) { this(StorageBackend.StorageType.HDFS, properties); @@ -79,14 +82,19 @@ public class DFSFileSystem extends RemoteFileSystem { for (Map.Entry<String, String> propEntry : properties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); } - - dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + try { + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new UserException(e); + } operations = new HDFSFileOperations(dfsFileSystem); } } @@ -94,6 +102,15 @@ public class DFSFileSystem extends RemoteFileSystem { return dfsFileSystem; } + protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive)); + } + + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return authenticator.doAs(() -> fileSystem.listStatus(new Path(remotePath))); + } + public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { Configuration hdfsConf = new HdfsConfiguration(); if (fallbackToSimpleAuth) { @@ -265,7 +282,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - boolean isPathExist = fileSystem.exists(inputFilePath); + boolean isPathExist = authenticator.doAs(() -> fileSystem.exists(inputFilePath)); if (!isPathExist) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); } @@ -380,7 +397,7 @@ public class DFSFileSystem extends RemoteFileSystem { FileSystem fileSystem = nativeFileSystem(destPath); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); - boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); + boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); if (!isRenameSuccess) { return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); } @@ -401,7 +418,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - fileSystem.delete(inputFilePath, true); + authenticator.doAs(() -> fileSystem.delete(inputFilePath, true)); } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { @@ -427,7 +444,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(pathUri.getPath()); - FileStatus[] files = fileSystem.globStatus(pathPattern); + FileStatus[] files = authenticator.doAs(() -> fileSystem.globStatus(pathPattern)); if (files == null) { LOG.info("no files in path " + remotePath); return Status.OK; @@ -454,7 +471,7 @@ public class DFSFileSystem extends RemoteFileSystem { public Status makeDir(String remotePath) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - if (!fileSystem.mkdirs(new Path(remotePath))) { + if (!authenticator.doAs(() -> fileSystem.mkdirs(new Path(remotePath)))) { LOG.warn("failed to make dir for " + remotePath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy index a3b39d1221a..7e7f276236a 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy @@ -1,3 +1,5 @@ +import groovyjarjarantlr4.v4.codegen.model.ExceptionClause + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,6 +17,8 @@ // specific language governing permissions and limitations // under the License. +import org.junit.Assert; + suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_d sql """ use test_krb_hive_db """ order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """ + // 3. multi thread test + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from ${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + + Thread thread2 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + sleep(5000L) + thread1.start() + thread2.start() + + thread1.join() + thread2.join() sql """drop catalog ${hms_catalog_name};""" sql """drop catalog other_${hms_catalog_name};""" + // TODO: add tvf case } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org