This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 884c908e25 [Enhancement](multi-catalog) try to reuse existed ugi. (#21274) 884c908e25 is described below commit 884c908e25be74af734eb6b370708fb4524e177e Author: Xiangyu Wang <dut.xian...@gmail.com> AuthorDate: Thu Jun 29 09:04:59 2023 +0800 [Enhancement](multi-catalog) try to reuse existed ugi. (#21274) Try to reuse an existed ugi at DFSFileSystem, otherwise if we query a more then ten-thousands partitons hms table, we will do more than ten-thousands login operations, each login operation will cost hundreds of ms from my test. Co-authored-by: 王翔宇 <wangxian...@360shuke.com> --- .../org/apache/doris/catalog/HdfsResource.java | 1 + .../apache/doris/fs/remote/RemoteFileSystem.java | 3 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 70 ++++++++++++++-------- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index d812d8ec53..cdfb169590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -48,6 +48,7 @@ public class HdfsResource extends Resource { public static String HADOOP_USER_NAME = "hadoop.username"; public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; + public static String HADOOP_KERBEROS_AUTHORIZATION = "hadoop.security.authorization"; public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 7d87993733..ffe63f20ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -31,7 +31,8 @@ import java.util.ArrayList; import java.util.List; public abstract class RemoteFileSystem extends PersistentFileSystem { - protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null; + // this field will be visited by multi-threads, better use volatile qualifier + protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null; public RemoteFileSystem(String name, StorageBackend.StorageType type) { super(name, type); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 9f72595ad8..ce297ce920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -29,6 +29,7 @@ import org.apache.doris.fs.operations.OpParams; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -54,7 +55,6 @@ import java.security.PrivilegedAction; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; public class DFSFileSystem extends RemoteFileSystem { @@ -82,42 +82,61 @@ public class DFSFileSystem extends RemoteFileSystem { conf.set(propEntry.getKey(), propEntry.getValue()); } - UserGroupInformation ugi = getUgi(conf); - AtomicReference<Exception> exception = new AtomicReference<>(); + boolean hasRelogin = false; + UserGroupInformation ugi; + try { + // try use current ugi first to avoid relogin + // because it may be a time-consuming task + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + LOG.warn("An IOException occurs when invoke " + + "UserGroupInformation.getCurrentUser(), relogin immediately.", e); + ugi = doLogin(conf); + hasRelogin = true; + } - dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> { + do { try { - String username = properties.get(HdfsResource.HADOOP_USER_NAME); - if (username == null) { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } else { - return FileSystem.get(new Path(remotePath).toUri(), conf, username); + dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> { + try { + String username = properties.get(HdfsResource.HADOOP_USER_NAME); + return username == null + ? FileSystem.get(new Path(remotePath).toUri(), conf) + : FileSystem.get(new Path(remotePath).toUri(), conf, username); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + LOG.debug("Reuse current ugi for dfs, remote path: {}", remotePath); + break; + } catch (SecurityException e) { + LOG.warn("A SecurityException occurs when invoke ugi.doAs(), " + + "relogin and retry immediately.", e); + if (hasRelogin) { + throw new UserException(e); } - } catch (Exception e) { - exception.set(e); - return null; + ugi = doLogin(conf); + hasRelogin = true; } - }); + } while (true); - if (dfsFileSystem == null) { - LOG.error("errors while connect to " + remotePath, exception.get()); - throw new UserException("errors while connect to " + remotePath, exception.get()); - } + Preconditions.checkNotNull(dfsFileSystem); operations = new HDFSFileOperations(dfsFileSystem); return dfsFileSystem; } - private UserGroupInformation getUgi(Configuration conf) throws UserException { - String authentication = conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null); - if (AuthType.KERBEROS.getDesc().equals(authentication)) { - conf.set("hadoop.security.authorization", "true"); - UserGroupInformation.setConfiguration(conf); + private UserGroupInformation doLogin(Configuration conf) throws UserException { + if (AuthType.KERBEROS.getDesc().equals( + conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null))) { + conf.set(HdfsResource.HADOOP_KERBEROS_AUTHORIZATION, "true"); String principal = conf.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL); String keytab = conf.get(HdfsResource.HADOOP_KERBEROS_KEYTAB); + + UserGroupInformation.setConfiguration(conf); try { UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); UserGroupInformation.setLoginUser(ugi); - LOG.info("kerberos authentication successful"); + LOG.info("Login by kerberos authentication with principal: {}", principal); return ugi; } catch (IOException e) { throw new UserException(e); @@ -128,7 +147,10 @@ public class DFSFileSystem extends RemoteFileSystem { hadoopUserName = "hadoop"; LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop"); } - return UserGroupInformation.createRemoteUser(hadoopUserName); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + UserGroupInformation.setLoginUser(ugi); + LOG.info("Login by proxy user, hadoop.username: {}", hadoopUserName); + return ugi; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org