This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5fd82400a0 [fix](kerberos)fix and refactor ugi login for kerberos and 
simple authentication (#37301)
a5fd82400a0 is described below

commit a5fd82400a0d40560c0f9c85da987ec296e9d1df
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Thu Jul 18 16:04:29 2024 +0800

    [fix](kerberos)fix and refactor ugi login for kerberos and simple 
authentication (#37301)
    
    ## Proposed changes
    optimize kerberos ugi login:
    1. support authentication framework for external table
    2. cache ugi to avoid conflicts when set configuration for ugi login
    3. do ugi login just on creating catalog, which reduces the ugi create
    times
    4. only simple authentication will use getloginUser, which avoids
    conflicts between simple and kerberos authentication
    
    <!--Describe your changes.-->
---
 .../kerberos/common/conf/doris-krb5.conf           |   2 +-
 .../authentication/AuthenticationConfig.java       |   2 +
 ...icationConfig.java => HadoopAuthenticator.java} |  32 ++--
 .../HadoopKerberosAuthenticator.java               | 163 +++++++++++++++++++++
 .../authentication/HadoopSimpleAuthenticator.java  |  47 ++++++
 .../common/security/authentication/HadoopUGI.java  |  89 +++--------
 ....java => ImpersonatingHadoopAuthenticator.java} |  29 ++--
 .../KerberosAuthenticationConfig.java              |   3 +
 .../doris/datasource/hive/HMSCachedClient.java     |   5 +
 .../doris/datasource/hive/HMSExternalCatalog.java  |  12 +-
 .../doris/datasource/hive/HiveMetadataOps.java     |   7 +-
 .../datasource/hive/ThriftHMSCachedClient.java     |  12 +-
 .../apache/doris/fs/remote/RemoteFileSystem.java   |  14 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  45 ++++--
 .../kerberos/test_two_hive_kerberos.groovy         |  33 +++++
 15 files changed, 380 insertions(+), 115 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf 
b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
index 7624b94e6ad..36547b8f89d 100644
--- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
+++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
@@ -24,7 +24,7 @@
  default_realm = LABS.TERADATA.COM
  dns_lookup_realm = false
  dns_lookup_kdc = false
- ticket_lifetime = 24h
+ ticket_lifetime = 5s
  # this setting is causing a Message stream modified (41) error when talking 
to KDC running on CentOS 7: https://stackoverflow.com/a/60978520
  # renew_lifetime = 7d
  forwardable = true
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
index 32a27b2263a..875ae4542e1 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
@@ -26,6 +26,7 @@ public abstract class AuthenticationConfig {
     public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
     public static String HIVE_KERBEROS_PRINCIPAL = 
"hive.metastore.kerberos.principal";
     public static String HIVE_KERBEROS_KEYTAB = 
"hive.metastore.kerberos.keytab.file";
+    public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
 
     /**
      * @return true if the config is valid, otherwise false.
@@ -57,6 +58,7 @@ public abstract class AuthenticationConfig {
             krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey));
             krbConfig.setKerberosKeytab(conf.get(krbKeytabKey));
             krbConfig.setConf(conf);
+            
krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, 
"false")));
             return krbConfig;
         } else {
             // AuthType.SIMPLE
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
similarity index 51%
copy from 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index 722cd0352b7..c3cab5f410b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -17,18 +17,28 @@
 
 package org.apache.doris.common.security.authentication;
 
-import lombok.Data;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 
-@Data
-public class KerberosAuthenticationConfig extends AuthenticationConfig {
-    private String kerberosPrincipal;
-    private String kerberosKeytab;
-    private Configuration conf;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
-    @Override
-    public boolean isValid() {
-        return StringUtils.isNotEmpty(kerberosPrincipal) && 
StringUtils.isNotEmpty(kerberosKeytab);
+public interface HadoopAuthenticator {
+
+    UserGroupInformation getUGI() throws IOException;
+
+    default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException 
{
+        try {
+            return getUGI().doAs(action);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig 
config) {
+        if (config instanceof KerberosAuthenticationConfig) {
+            return new 
HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config);
+        } else {
+            return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config);
+        }
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
new file mode 100644
index 00000000000..14dace68b24
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.trino.plugin.base.authentication.KerberosTicketUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+public class HadoopKerberosAuthenticator implements HadoopAuthenticator {
+    private static final Logger LOG = 
LogManager.getLogger(HadoopKerberosAuthenticator.class);
+    private final KerberosAuthenticationConfig config;
+    private Subject subject;
+    private long nextRefreshTime;
+    private UserGroupInformation ugi;
+
+    public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) {
+        this.config = config;
+    }
+
+    public static void initializeAuthConfig(Configuration hadoopConf) {
+        
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
+        synchronized (HadoopKerberosAuthenticator.class) {
+            // avoid other catalog set conf at the same time
+            UserGroupInformation.setConfiguration(hadoopConf);
+        }
+    }
+
+    @Override
+    public synchronized UserGroupInformation getUGI() throws IOException {
+        if (ugi == null) {
+            subject = getSubject(config.getKerberosKeytab(), 
config.getKerberosPrincipal(), config.isPrintDebugLog());
+            ugi = Objects.requireNonNull(login(subject), "login result is 
null");
+            return ugi;
+        }
+        if (nextRefreshTime < System.currentTimeMillis()) {
+            long lastRefreshTime = nextRefreshTime;
+            Subject existingSubject = subject;
+            if (LOG.isDebugEnabled()) {
+                Date lastTicketEndTime = getTicketEndTime(subject);
+                LOG.debug("Current ticket expired time is {}", 
lastTicketEndTime);
+            }
+            // renew subject
+            Subject newSubject = getSubject(config.getKerberosKeytab(), 
config.getKerberosPrincipal(),
+                    config.isPrintDebugLog());
+            Objects.requireNonNull(login(newSubject), "re-login result is 
null");
+            // modify UGI instead of returning new UGI
+            existingSubject.getPrincipals().addAll(newSubject.getPrincipals());
+            Set<Object> privateCredentials = 
existingSubject.getPrivateCredentials();
+            // clear the old credentials
+            synchronized (privateCredentials) {
+                privateCredentials.clear();
+                privateCredentials.addAll(newSubject.getPrivateCredentials());
+            }
+            Set<Object> publicCredentials = 
existingSubject.getPublicCredentials();
+            synchronized (publicCredentials) {
+                publicCredentials.clear();
+                publicCredentials.addAll(newSubject.getPublicCredentials());
+            }
+            nextRefreshTime = calculateNextRefreshTime(newSubject);
+            if (LOG.isDebugEnabled()) {
+                Date lastTicketEndTime = getTicketEndTime(newSubject);
+                LOG.debug("Next ticket expired time is {}", lastTicketEndTime);
+                LOG.debug("Refresh kerberos ticket succeeded, last time is {}, 
next time is {}",
+                        lastRefreshTime, nextRefreshTime);
+            }
+        }
+        return ugi;
+    }
+
+    private UserGroupInformation login(Subject subject) throws IOException {
+        // login and get ugi when catalog is initialized
+        initializeAuthConfig(config.getConf());
+        String principal = config.getKerberosPrincipal();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Login by kerberos authentication with principal: {}", 
principal);
+        }
+        return UserGroupInformation.getUGIFromSubject(subject);
+    }
+
+    private static long calculateNextRefreshTime(Subject subject) {
+        Preconditions.checkArgument(subject != null, "subject must be present 
in kerberos based UGI");
+        KerberosTicket tgtTicket = 
KerberosTicketUtils.getTicketGrantingTicket(subject);
+        return KerberosTicketUtils.getRefreshTime(tgtTicket);
+    }
+
+    private static Date getTicketEndTime(Subject subject) {
+        Preconditions.checkArgument(subject != null, "subject must be present 
in kerberos based UGI");
+        KerberosTicket tgtTicket = 
KerberosTicketUtils.getTicketGrantingTicket(subject);
+        return tgtTicket.getEndTime();
+    }
+
+    private static Subject getSubject(String keytab, String principal, boolean 
printDebugLog) {
+        Subject subject = new Subject(false, ImmutableSet.of(new 
KerberosPrincipal(principal)),
+                Collections.emptySet(), Collections.emptySet());
+        javax.security.auth.login.Configuration conf = 
getConfiguration(keytab, principal, printDebugLog);
+        try {
+            LoginContext loginContext = new LoginContext("", subject, null, 
conf);
+            loginContext.login();
+            return loginContext.getSubject();
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static javax.security.auth.login.Configuration 
getConfiguration(String keytab, String principal,
+                                                                            
boolean printDebugLog) {
+        return new javax.security.auth.login.Configuration() {
+            @Override
+            public AppConfigurationEntry[] getAppConfigurationEntry(String 
name) {
+                ImmutableMap.Builder<String, String> builder = 
ImmutableMap.<String, String>builder()
+                        .put("doNotPrompt", "true")
+                        .put("isInitiator", "true")
+                        .put("useKeyTab", "true")
+                        .put("storeKey", "true")
+                        .put("keyTab", keytab)
+                        .put("principal", principal);
+                if (printDebugLog) {
+                    builder.put("debug", "true");
+                }
+                Map<String, String> options = builder.build();
+                return new AppConfigurationEntry[]{
+                    new AppConfigurationEntry(
+                        "com.sun.security.auth.module.Krb5LoginModule",
+                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                        options)};
+            }
+        };
+    }
+}
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
new file mode 100644
index 00000000000..fbe0d0aba7d
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HadoopSimpleAuthenticator implements HadoopAuthenticator {
+    private static final Logger LOG = 
LogManager.getLogger(HadoopSimpleAuthenticator.class);
+    private final UserGroupInformation ugi;
+
+    public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) {
+        String hadoopUserName = config.getUsername();
+        if (hadoopUserName == null) {
+            hadoopUserName = "hadoop";
+            config.setUsername(hadoopUserName);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{} is unset, use default user: hadoop", 
AuthenticationConfig.HADOOP_USER_NAME);
+            }
+        }
+        ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Login by proxy user, hadoop.username: {}", 
hadoopUserName);
+        }
+    }
+
+    @Override
+    public UserGroupInformation getUGI() {
+        return ugi;
+    }
+}
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
index 1a86b9e327a..d04d772728b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
@@ -18,8 +18,6 @@
 package org.apache.doris.common.security.authentication;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -27,6 +25,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
+@Deprecated
 public class HadoopUGI {
     private static final Logger LOG = LogManager.getLogger(HadoopUGI.class);
 
@@ -39,82 +38,30 @@ public class HadoopUGI {
         if (config == null || !config.isValid()) {
             return null;
         }
-        UserGroupInformation ugi;
         if (config instanceof KerberosAuthenticationConfig) {
-            KerberosAuthenticationConfig krbConfig = 
(KerberosAuthenticationConfig) config;
-            Configuration hadoopConf = krbConfig.getConf();
-            
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
-            
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED,
 "true");
-            UserGroupInformation.setConfiguration(hadoopConf);
-            String principal = krbConfig.getKerberosPrincipal();
             try {
-                //  login hadoop with keytab and try checking TGT
-                ugi = UserGroupInformation.getLoginUser();
-                LOG.debug("Current login user: {}", ugi.getUserName());
-                if (ugi.hasKerberosCredentials() && 
StringUtils.equals(ugi.getUserName(), principal)) {
-                    // if the current user is logged by kerberos and is the 
same user
-                    // just use checkTGTAndReloginFromKeytab because this 
method will only relogin
-                    // when the TGT is expired or is close to expiry
-                    ugi.checkTGTAndReloginFromKeytab();
-                    return ugi;
+                // TODO: remove after iceberg and hudi kerberos test case pass
+                try {
+                    //  login hadoop with keytab and try checking TGT
+                    UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+                    LOG.debug("Current login user: {}", ugi.getUserName());
+                    String principal = ((KerberosAuthenticationConfig) 
config).getKerberosPrincipal();
+                    if (ugi.hasKerberosCredentials() && 
StringUtils.equals(ugi.getUserName(), principal)) {
+                        // if the current user is logged by kerberos and is 
the same user
+                        // just use checkTGTAndReloginFromKeytab because this 
method will only relogin
+                        // when the TGT is expired or is close to expiry
+                        ugi.checkTGTAndReloginFromKeytab();
+                        return ugi;
+                    }
+                } catch (IOException e) {
+                    LOG.warn("A SecurityException occurs with kerberos, do 
login immediately.", e);
                 }
-            } catch (IOException e) {
-                LOG.warn("A SecurityException occurs with kerberos, do login 
immediately.", e);
-            }
-            try {
-                ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
krbConfig.getKerberosKeytab());
-                UserGroupInformation.setLoginUser(ugi);
-                LOG.debug("Login by kerberos authentication with principal: 
{}", principal);
-                return ugi;
+                return new 
HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI();
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
         } else {
-            String hadoopUserName = ((SimpleAuthenticationConfig) 
config).getUsername();
-            if (hadoopUserName == null) {
-                hadoopUserName = "hadoop";
-                ((SimpleAuthenticationConfig) 
config).setUsername(hadoopUserName);
-                LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, 
use default user: hadoop");
-            }
-
-            try {
-                ugi = UserGroupInformation.getLoginUser();
-                if (ugi.getUserName().equals(hadoopUserName)) {
-                    return ugi;
-                }
-            } catch (IOException e) {
-                LOG.warn("A SecurityException occurs with simple, do login 
immediately.", e);
-            }
-
-            ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
-            UserGroupInformation.setLoginUser(ugi);
-            LOG.debug("Login by proxy user, hadoop.username: {}", 
hadoopUserName);
-            return ugi;
-        }
-    }
-
-    /**
-     * use for HMSExternalCatalog to login
-     * @param config auth config
-     */
-    public static void tryKrbLogin(String catalogName, AuthenticationConfig 
config) {
-        if (config instanceof KerberosAuthenticationConfig) {
-            KerberosAuthenticationConfig krbConfig = 
(KerberosAuthenticationConfig) config;
-            try {
-                Configuration hadoopConf = krbConfig.getConf();
-                
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
-                
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED,
 "true");
-                UserGroupInformation.setConfiguration(hadoopConf);
-                /**
-                 * Because metastore client is created by using
-                 * {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy}
-                 * it will relogin when TGT is expired, so we don't need to 
relogin manually.
-                 */
-                
UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(),
-                        krbConfig.getKerberosKeytab());
-            } catch (IOException e) {
-                throw new RuntimeException("login with kerberos auth failed 
for catalog: " + catalogName, e);
-            }
+            return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config).getUGI();
         }
     }
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
similarity index 54%
copy from 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
index 722cd0352b7..10e42f4bc67 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
@@ -17,18 +17,27 @@
 
 package org.apache.doris.common.security.authentication;
 
-import lombok.Data;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 
-@Data
-public class KerberosAuthenticationConfig extends AuthenticationConfig {
-    private String kerberosPrincipal;
-    private String kerberosKeytab;
-    private Configuration conf;
+import java.io.IOException;
+import java.util.Objects;
+
+public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator {
+
+    private final HadoopAuthenticator delegate;
+    private final String username;
+    private UserGroupInformation ugi;
+
+    public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, 
String username) {
+        this.delegate = Objects.requireNonNull(delegate);
+        this.username = Objects.requireNonNull(username);
+    }
 
     @Override
-    public boolean isValid() {
-        return StringUtils.isNotEmpty(kerberosPrincipal) && 
StringUtils.isNotEmpty(kerberosKeytab);
+    public synchronized UserGroupInformation getUGI() throws IOException {
+        if (ugi == null) {
+            ugi = UserGroupInformation.createProxyUser(username, 
delegate.getUGI());
+        }
+        return ugi;
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
index 722cd0352b7..adf76274386 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
@@ -18,14 +18,17 @@
 package org.apache.doris.common.security.authentication;
 
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
 public class KerberosAuthenticationConfig extends AuthenticationConfig {
     private String kerberosPrincipal;
     private String kerberosKeytab;
     private Configuration conf;
+    private boolean printDebugLog;
 
     @Override
     public boolean isValid() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index 1ac77972053..a5e0eefb348 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -113,6 +114,10 @@ public interface HMSCachedClient {
 
     void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData);
 
+    default void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
+        // Ignored by default
+    }
+
     /**
      * close the connection, eg, to hms
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 91192b63c24..a22eacaf1e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopUGI;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
@@ -40,7 +40,9 @@ import org.apache.doris.fs.FileSystemProviderImpl;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import lombok.Getter;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.logging.log4j.LogManager;
@@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
     private ThreadPoolExecutor fileSystemExecutor;
+    @Getter
+    private HadoopAuthenticator authenticator;
 
+    @VisibleForTesting
     public HMSExternalCatalog() {
         catalogProperty = new CatalogProperty(null, null);
     }
@@ -81,6 +86,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
         super(catalogId, name, InitCatalogLog.Type.HMS, comment);
         props = PropertyConverter.convertToMetaProperties(props);
         catalogProperty = new CatalogProperty(resource, props);
+        AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+        authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
     }
 
     @Override
@@ -148,9 +155,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
             }
             
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
                     
String.valueOf(Config.hive_metastore_client_timeout_second));
-            HadoopUGI.tryKrbLogin(this.getName(), 
AuthenticationConfig.getKerberosConfig(hiveConf,
-                    AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
-                    AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 7267297c93e..f438f5e1782 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@@ -60,11 +61,14 @@ public class HiveMetadataOps implements ExternalMetadataOps 
{
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     private final HMSCachedClient client;
     private final HMSExternalCatalog catalog;
+    private HadoopAuthenticator hadoopAuthenticator;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
         this(catalog, createCachedClient(hiveConf,
                 Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size),
                 jdbcClientConfig));
+        hadoopAuthenticator = catalog.getAuthenticator();
+        client.setHadoopAuthenticator(hadoopAuthenticator);
     }
 
     @VisibleForTesting
@@ -84,7 +88,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     private static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
             JdbcClientConfig jdbcClientConfig) {
         if (hiveConf != null) {
-            return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
+            ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, 
thriftClientPoolSize);
+            return client;
         }
         Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and 
jdbcClientConfig are both null");
         String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 55d8ffc2e02..7632101d7ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -92,6 +93,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient 
{
     private boolean isClosed = false;
     private final int poolSize;
     private final HiveConf hiveConf;
+    private HadoopAuthenticator hadoopAuthenticator;
 
     public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) {
         Preconditions.checkArgument(poolSize > 0, poolSize);
@@ -104,6 +106,10 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         this.isClosed = false;
     }
 
+    public void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
+        this.hadoopAuthenticator = hadoopAuthenticator;
+    }
+
     @Override
     public void close() {
         synchronized (clientPool) {
@@ -678,7 +684,11 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     }
 
     private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
-        return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
+        try {
+            return hadoopAuthenticator.doAs(action);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 68de3a8fdef..3cb8a036c2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -56,7 +57,7 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         try {
             org.apache.hadoop.fs.FileSystem fileSystem = 
nativeFileSystem(remotePath);
             Path locatedPath = new Path(remotePath);
-            RemoteIterator<LocatedFileStatus> locatedFiles = 
fileSystem.listFiles(locatedPath, recursive);
+            RemoteIterator<LocatedFileStatus> locatedFiles = 
getLocatedFiles(recursive, fileSystem, locatedPath);
             while (locatedFiles.hasNext()) {
                 LocatedFileStatus fileStatus = locatedFiles.next();
                 RemoteFile location = new RemoteFile(
@@ -72,11 +73,16 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         return Status.OK;
     }
 
+    protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean 
recursive,
+                FileSystem fileSystem, Path locatedPath) throws IOException {
+        return fileSystem.listFiles(locatedPath, recursive);
+    }
+
     @Override
     public Status listDirectories(String remotePath, Set<String> result) {
         try {
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            FileStatus[] fileStatuses = fileSystem.listStatus(new 
Path(remotePath));
+            FileStatus[] fileStatuses = getFileStatuses(remotePath, 
fileSystem);
             result.addAll(
                     Arrays.stream(fileStatuses)
                             .filter(FileStatus::isDirectory)
@@ -88,6 +94,10 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         return Status.OK;
     }
 
+    protected FileStatus[] getFileStatuses(String remotePath, FileSystem 
fileSystem) throws IOException {
+        return fileSystem.listStatus(new Path(remotePath));
+    }
+
     @Override
     public Status renameDir(String origFilePath,
                             String destFilePath,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 5532f1187fe..59fbd73bda7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -21,7 +21,7 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopUGI;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.common.util.URI;
 import org.apache.doris.fs.operations.HDFSFileOperations;
 import org.apache.doris.fs.operations.HDFSOpParams;
@@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -59,6 +61,7 @@ public class DFSFileSystem extends RemoteFileSystem {
     public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = 
"ipc.client.fallback-to-simple-auth-allowed";
     private static final Logger LOG = 
LogManager.getLogger(DFSFileSystem.class);
     private HDFSFileOperations operations = null;
+    private HadoopAuthenticator authenticator = null;
 
     public DFSFileSystem(Map<String, String> properties) {
         this(StorageBackend.StorageType.HDFS, properties);
@@ -79,14 +82,19 @@ public class DFSFileSystem extends RemoteFileSystem {
                     for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
                         conf.set(propEntry.getKey(), propEntry.getValue());
                     }
-
-                    dfsFileSystem = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
-                        try {
-                            return FileSystem.get(new 
Path(remotePath).toUri(), conf);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
+                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
+                    authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
+                    try {
+                        dfsFileSystem = authenticator.doAs(() -> {
+                            try {
+                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                    } catch (Exception e) {
+                        throw new UserException(e);
+                    }
                     operations = new HDFSFileOperations(dfsFileSystem);
                 }
             }
@@ -94,6 +102,15 @@ public class DFSFileSystem extends RemoteFileSystem {
         return dfsFileSystem;
     }
 
+    protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean 
recursive,
+                FileSystem fileSystem, Path locatedPath) throws IOException {
+        return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, 
recursive));
+    }
+
+    protected FileStatus[] getFileStatuses(String remotePath, FileSystem 
fileSystem) throws IOException {
+        return authenticator.doAs(() -> fileSystem.listStatus(new 
Path(remotePath)));
+    }
+
     public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
         Configuration hdfsConf = new HdfsConfiguration();
         if (fallbackToSimpleAuth) {
@@ -265,7 +282,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             Path inputFilePath = new Path(pathUri.getPath());
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            boolean isPathExist = fileSystem.exists(inputFilePath);
+            boolean isPathExist = authenticator.doAs(() -> 
fileSystem.exists(inputFilePath));
             if (!isPathExist) {
                 return new Status(Status.ErrCode.NOT_FOUND, "remote path does 
not exist: " + remotePath);
             }
@@ -380,7 +397,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             FileSystem fileSystem = nativeFileSystem(destPath);
             Path srcfilePath = new Path(srcPathUri.getPath());
             Path destfilePath = new Path(destPathUri.getPath());
-            boolean isRenameSuccess = fileSystem.rename(srcfilePath, 
destfilePath);
+            boolean isRenameSuccess = authenticator.doAs(() -> 
fileSystem.rename(srcfilePath, destfilePath));
             if (!isRenameSuccess) {
                 return new Status(Status.ErrCode.COMMON_ERROR, "failed to 
rename " + srcPath + " to " + destPath);
             }
@@ -401,7 +418,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             Path inputFilePath = new Path(pathUri.getPath());
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            fileSystem.delete(inputFilePath, true);
+            authenticator.doAs(() -> fileSystem.delete(inputFilePath, true));
         } catch (UserException e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
         } catch (IOException e) {
@@ -427,7 +444,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             FileSystem fileSystem = nativeFileSystem(remotePath);
             Path pathPattern = new Path(pathUri.getPath());
-            FileStatus[] files = fileSystem.globStatus(pathPattern);
+            FileStatus[] files = authenticator.doAs(() -> 
fileSystem.globStatus(pathPattern));
             if (files == null) {
                 LOG.info("no files in path " + remotePath);
                 return Status.OK;
@@ -454,7 +471,7 @@ public class DFSFileSystem extends RemoteFileSystem {
     public Status makeDir(String remotePath) {
         try {
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            if (!fileSystem.mkdirs(new Path(remotePath))) {
+            if (!authenticator.doAs(() -> fileSystem.mkdirs(new 
Path(remotePath)))) {
                 LOG.warn("failed to make dir for " + remotePath);
                 return new Status(Status.ErrCode.COMMON_ERROR, "failed to make 
dir for " + remotePath);
             }
diff --git 
a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
 
b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
index a3b39d1221a..7e7f276236a 100644
--- 
a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
+++ 
b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
@@ -1,3 +1,5 @@
+import groovyjarjarantlr4.v4.codegen.model.ExceptionClause
+
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,6 +17,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.junit.Assert;
+
 suite("test_two_hive_kerberos", 
"p0,external,kerberos,external_docker,external_docker_kerberos") {
     String enabled = context.config.otherConfigs.get("enableKerberosTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
@@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", 
"p0,external,kerberos,external_docker,external_d
         sql """ use test_krb_hive_db """
         order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """
 
+        // 3. multi thread test
+        Thread thread1 = new Thread(() -> {
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    sql """ select * from 
${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+
+        Thread thread2 = new Thread(() -> {
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    sql """ select * from 
other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+        sleep(5000L)
+        thread1.start()
+        thread2.start()
+
+        thread1.join()
+        thread2.join()
         sql """drop catalog ${hms_catalog_name};"""
         sql """drop catalog other_${hms_catalog_name};"""
+        // TODO: add tvf case
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to