This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 00dda79735 [fix](broker-load) Correction of kerberos authentication time determination rule (#11793) 00dda79735 is described below commit 00dda79735bff74304987007e547ff7585296811 Author: HB <hubia...@corp.netease.com> AuthorDate: Sun Sep 18 17:46:13 2022 +0800 [fix](broker-load) Correction of kerberos authentication time determination rule (#11793) Every time a new broker load comes in, Doris will update the start time of Kerberos authentication, but this logic is wrong. Because the authentication duration of Kerberos is calculated from the moment when the ticket is obtained. This PR change the logic: 1. If it is kerberos, check fs expiration by create time. 2.Otherwise, check fs expiration by access time --- .../conf/apache_hdfs_broker.conf | 2 +- .../org/apache/doris/broker/hdfs/BrokerConfig.java | 12 +-- .../apache/doris/broker/hdfs/BrokerFileSystem.java | 34 ++++--- .../doris/broker/hdfs/ClientContextManager.java | 78 +++++++-------- .../doris/broker/hdfs/FileSystemManager.java | 108 ++++++++++++--------- 5 files changed, 127 insertions(+), 107 deletions(-) diff --git a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf index 8466de02cd..e023067cf0 100644 --- a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf +++ b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf @@ -27,7 +27,7 @@ broker_ipc_port = 8000 # client session will be deleted if not receive ping after this time -client_expire_seconds = 1800 +client_expire_seconds = 3600 # Advanced configurations # sys_log_dir = ${BROKER_HOME}/log diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java index 6381f16db2..aa91b9d393 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java @@ -21,19 +21,19 @@ import org.apache.doris.common.ConfigBase; public class BrokerConfig extends ConfigBase { - + @ConfField public static int hdfs_read_buffer_size_kb = 1024; - + @ConfField public static int hdfs_write_buffer_size_kb = 1024; - + @ConfField - public static int client_expire_seconds = 1800; - + public static int client_expire_seconds = 3600; + @ConfField public static int broker_ipc_port = 8000; - + @ConfField public static String sys_log_dir = System.getenv("BROKER_HOME") + "/log"; @ConfField diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java index 7b9f6cc679..c8a217d205 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java @@ -27,26 +27,29 @@ public class BrokerFileSystem { private static Logger logger = Logger .getLogger(BrokerFileSystem.class.getName()); - + private ReentrantLock lock; private FileSystemIdentity identity; private FileSystem dfsFileSystem; private long lastAccessTimestamp; + private long createTimestamp; private UUID fileSystemId; - + public BrokerFileSystem(FileSystemIdentity identity) { this.identity = identity; this.lock = new ReentrantLock(); this.dfsFileSystem = null; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); this.fileSystemId = UUID.randomUUID(); } - + public synchronized void setFileSystem(FileSystem fileSystem) { this.dfsFileSystem = fileSystem; this.lastAccessTimestamp = System.currentTimeMillis(); + this.createTimestamp = System.currentTimeMillis(); } - + public void closeFileSystem() { lock.lock(); try { @@ -63,31 +66,32 @@ public class BrokerFileSystem { lock.unlock(); } } - + public FileSystem getDFSFileSystem() { this.lastAccessTimestamp = System.currentTimeMillis(); return dfsFileSystem; } - + public void updateLastUpdateAccessTime() { this.lastAccessTimestamp = System.currentTimeMillis(); } - + public FileSystemIdentity getIdentity() { return identity; } - + public ReentrantLock getLock() { return lock; } - - public boolean isExpired(long expirationIntervalSecs) { - if (System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000) { - return true; - } - return false; + + public boolean isExpiredByLastAccessTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - lastAccessTimestamp > expirationIntervalSecs * 1000; } - + + public boolean isExpiredByCreateTime(long expirationIntervalSecs) { + return System.currentTimeMillis() - createTimestamp > expirationIntervalSecs * 1000; + } + @Override public String toString() { return "BrokerFileSystem [identity=" + identity + ", dfsFileSystem=" diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index 59f0e974b4..e52f248e11 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -31,19 +31,19 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode; public class ClientContextManager { private static Logger logger = Logger - .getLogger(ClientContextManager.class.getName()); + .getLogger(ClientContextManager.class.getName()); private ScheduledExecutorService executorService; private ConcurrentHashMap<String, ClientResourceContext> clientContexts; private ConcurrentHashMap<TBrokerFD, String> fdToClientMap; private int clientExpirationSeconds = BrokerConfig.client_expire_seconds; - + public ClientContextManager(ScheduledExecutorService executorService) { clientContexts = new ConcurrentHashMap<>(); fdToClientMap = new ConcurrentHashMap<>(); this.executorService = executorService; this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS); } - + public void onPing(String clientId) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); @@ -51,9 +51,9 @@ public class ClientContextManager { ClientResourceContext clientContext = clientContexts.get(clientId); clientContext.updateLastPingTime(); } - - public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewOutputStream(String clientId, TBrokerFD fd, FSDataOutputStream fsDataOutputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -61,9 +61,9 @@ public class ClientContextManager { clientContext.putOutputStream(fd, fsDataOutputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - - public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, - BrokerFileSystem brokerFileSystem) { + + public synchronized void putNewInputStream(String clientId, TBrokerFD fd, FSDataInputStream fsDataInputStream, + BrokerFileSystem brokerFileSystem) { if (!clientContexts.containsKey(clientId)) { clientContexts.putIfAbsent(clientId, new ClientResourceContext(clientId)); } @@ -71,29 +71,29 @@ public class ClientContextManager { clientContext.putInputStream(fd, fsDataInputStream, brokerFileSystem); fdToClientMap.putIfAbsent(fd, clientId); } - + public synchronized FSDataInputStream getFsDataInputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataInputStream fsDataInputStream = clientContext.getInputStream(fd); return fsDataInputStream; } - + public synchronized FSDataOutputStream getFsDataOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.get(fd); if (clientId == null) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "the fd is not owned by client {}", clientId); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "the fd is not owned by client {}", clientId); } ClientResourceContext clientContext = clientContexts.get(clientId); FSDataOutputStream fsDataOutputStream = clientContext.getOutputStream(fd); return fsDataOutputStream; } - + public synchronized void removeInputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -109,7 +109,7 @@ public class ClientContextManager { logger.error("errors while close file data input stream", e); } } - + public synchronized void removeOutputStream(TBrokerFD fd) { String clientId = fdToClientMap.remove(fd); if (clientId == null) { @@ -125,7 +125,7 @@ public class ClientContextManager { logger.error("errors while close file data output stream", e); } } - + class CheckClientExpirationTask implements Runnable { @Override public void run() { @@ -139,9 +139,9 @@ public class ClientContextManager { ClientContextManager.this.removeOutputStream(fd); } clientContexts.remove(clientContext.clientId); - logger.info("client [" + clientContext.clientId - + "] is expired, remove it from contexts. last ping time is " - + clientContext.lastPingTimestamp); + logger.info("client [" + clientContext.clientId + + "] is expired, remove it from contexts. last ping time is " + + clientContext.lastPingTimestamp); } } } finally { @@ -149,71 +149,71 @@ public class ClientContextManager { } } } - + private static class BrokerOutputStream { - + private final FSDataOutputStream outputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerOutputStream(FSDataOutputStream outputStream, BrokerFileSystem brokerFileSystem) { this.outputStream = outputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataOutputStream getOutputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return outputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + private static class BrokerInputStream { - + private final FSDataInputStream inputStream; private final BrokerFileSystem brokerFileSystem; - + public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) { this.inputStream = inputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); } - + public FSDataInputStream getInputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); return inputStream; } - + public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } } - + static class ClientResourceContext { private String clientId; private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams; private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams; private long lastPingTimestamp; - + public ClientResourceContext(String clientId) { this.clientId = clientId; this.inputStreams = new ConcurrentHashMap<>(); this.outputStreams = new ConcurrentHashMap<>(); this.lastPingTimestamp = System.currentTimeMillis(); } - + public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) { inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem)); } - + public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) { outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem)); } - + public FSDataInputStream getInputStream(TBrokerFD fd) { BrokerInputStream brokerInputStream = inputStreams.get(fd); if (brokerInputStream != null) { @@ -221,7 +221,7 @@ public class ClientContextManager { } return null; } - + public FSDataOutputStream getOutputStream(TBrokerFD fd) { BrokerOutputStream brokerOutputStream = outputStreams.get(fd); if (brokerOutputStream != null) { @@ -229,14 +229,14 @@ public class ClientContextManager { } return null; } - + public void updateLastPingTime() { this.lastPingTimestamp = System.currentTimeMillis(); // Should we also update the underline filesystem? maybe it is time cost for (BrokerInputStream brokerInputStream : inputStreams.values()) { brokerInputStream.updateLastUpdateAccessTime(); } - + for (BrokerOutputStream brokerOutputStream : outputStreams.values()) { brokerOutputStream.updateLastUpdateAccessTime(); } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 262875de25..c6974e70a2 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -58,7 +58,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; public class FileSystemManager { @@ -108,13 +107,13 @@ public class FileSystemManager { private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache"; private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); - + private int readBufferSize = 128 << 10; // 128k private int writeBufferSize = 128 << 10; // 128k - + private ConcurrentHashMap<FileSystemIdentity, BrokerFileSystem> cachedFileSystem; private ClientContextManager clientContextManager; - + public FileSystemManager() { cachedFileSystem = new ConcurrentHashMap<>(); clientContextManager = new ClientContextManager(handleManagementPool); @@ -140,15 +139,15 @@ public class FileSystemManager { return finalPrincipal; } - + /** * visible for test - * + * * @param path * @param properties * @return BrokerFileSystem with different FileSystem based on scheme - * @throws URISyntaxException - * @throws Exception + * @throws URISyntaxException + * @throws Exception */ public BrokerFileSystem getFileSystem(String path, Map<String, String> properties) { WildcardURI pathUri = new WildcardURI(path); @@ -240,7 +239,7 @@ public class FileSystemManager { e.getMessage()); } } - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -394,8 +393,7 @@ public class FileSystemManager { String disableCache = properties.getOrDefault(FS_S3A_IMPL_DISABLE_CACHE, "true"); String s3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi); - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -440,7 +438,7 @@ public class FileSystemManager { String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); String ks3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi); - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { if (fileSystem.getDFSFileSystem() == null) { @@ -516,7 +514,7 @@ public class FileSystemManager { e.getMessage()); } } - BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); try { // create a new filesystem @@ -618,12 +616,12 @@ public class FileSystemManager { } catch (Exception e) { logger.error("errors while get file status ", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "unknown error when get file status"); } return resultFileStatus; } - + public void deletePath(String path, Map<String, String> properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -633,16 +631,16 @@ public class FileSystemManager { } catch (IOException e) { logger.error("errors while delete path " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "delete path {} error", path); } } - + public void renamePath(String srcPath, String destPath, Map<String, String> properties) { WildcardURI srcPathUri = new WildcardURI(srcPath); WildcardURI destPathUri = new WildcardURI(destPath); if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, "only allow rename in same file system"); } BrokerFileSystem fileSystem = getFileSystem(srcPath, properties); @@ -651,17 +649,17 @@ public class FileSystemManager { try { boolean isRenameSuccess = fileSystem.getDFSFileSystem().rename(srcfilePath, destfilePath); if (!isRenameSuccess) { - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "failed to rename path from {} to {}", srcPath, destPath); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + "failed to rename path from {} to {}", srcPath, destPath); } } catch (IOException e) { logger.error("errors while rename path from " + srcPath + " to " + destPath); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while rename {} to {}", srcPath, destPath); } } - + public boolean checkPathExist(String path, Map<String, String> properties) { WildcardURI pathUri = new WildcardURI(path); BrokerFileSystem fileSystem = getFileSystem(path, properties); @@ -672,11 +670,11 @@ public class FileSystemManager { } catch (IOException e) { logger.error("errors while check path exist: " + path); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "errors while check if path {} exist", path); } } - + public TBrokerFD openReader(String clientId, String path, long startOffset, Map<String, String> properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); @@ -691,11 +689,11 @@ public class FileSystemManager { } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public ByteBuffer pread(TBrokerFD fd, long offset, long length) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -746,12 +744,12 @@ public class FileSystemManager { } } } - + public void seek(TBrokerFD fd, long offset) { - throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, + throw new BrokerException(TBrokerOperationStatusCode.OPERATION_NOT_SUPPORTED, "seek this method is not supported"); } - + public void closeReader(TBrokerFD fd) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { @@ -766,13 +764,13 @@ public class FileSystemManager { } } } - + public TBrokerFD openWriter(String clientId, String path, Map<String, String> properties) { WildcardURI pathUri = new WildcardURI(path); Path inputFilePath = new Path(pathUri.getPath()); BrokerFileSystem fileSystem = getFileSystem(path, properties); try { - FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, + FSDataOutputStream fsDataOutputStream = fileSystem.getDFSFileSystem().create(inputFilePath, true, writeBufferSize); UUID uuid = UUID.randomUUID(); TBrokerFD fd = parseUUIDToFD(uuid); @@ -781,11 +779,11 @@ public class FileSystemManager { } catch (IOException e) { logger.error("errors while open path", e); fileSystem.closeFileSystem(); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, e, "could not open file {}", path); } } - + public void pwrite(TBrokerFD fd, long offset, byte[] data) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -811,7 +809,7 @@ public class FileSystemManager { } } } - + public void closeWriter(TBrokerFD fd) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { @@ -827,11 +825,11 @@ public class FileSystemManager { } } } - + public void ping(String clientId) { clientContextManager.onPing(clientId); } - + private static TBrokerFD parseUUIDToFD(UUID uuid) { return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } @@ -849,23 +847,41 @@ public class FileSystemManager { return readLength; } - private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity) { - BrokerFileSystem brokerFileSystem = null; + /** + * In view of the different expiration mechanisms of different authentication modes, + * there are two ways to determine whether BrokerFileSystem has expired: + * 1. For the authentication mode of Kerberos and S3 aksk, use the createTime to determine whether it expires + * 2. For other authentication modes, the lastAccessTime is used to determine whether it has expired + */ + private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity, Map<String, String> properties) { + BrokerFileSystem brokerFileSystem; if (cachedFileSystem.containsKey(fileSystemIdentity)) { brokerFileSystem = cachedFileSystem.get(fileSystemIdentity); - if (brokerFileSystem.isExpired(BrokerConfig.client_expire_seconds)) { - logger.info("file system " + brokerFileSystem + " is expired, close and remove it"); + if (properties.containsKey(KERBEROS_KEYTAB) && properties.containsKey(KERBEROS_PRINCIPAL)) { + if (brokerFileSystem.isExpiredByCreateTime(BrokerConfig.client_expire_seconds)) { + logger.info("file system " + brokerFileSystem + " is expired, update it."); + try { + Configuration conf = new HdfsConfiguration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, AUTHENTICATION_KERBEROS); + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI( + preparePrincipal(properties.get(KERBEROS_PRINCIPAL)), properties.get(KERBEROS_KEYTAB)); + // update FileSystem TGT + ugi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + logger.error("errors while checkTGTAndReloginFromKeytab: ", e); + } + } + } else if (brokerFileSystem.isExpiredByLastAccessTime(BrokerConfig.client_expire_seconds)) { brokerFileSystem.getLock().lock(); try { + logger.info("file system " + brokerFileSystem + " is expired, update it."); brokerFileSystem.closeFileSystem(); - } catch (Throwable t) { - logger.error("errors while close file system", t); - } finally { - cachedFileSystem.remove(brokerFileSystem.getIdentity()); brokerFileSystem.getLock().unlock(); - brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); - cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); + } catch (Throwable t) { + logger.error("errors while close file system: ", t); } + brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); + cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); } } else { brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org