This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 13cb81a724 [fix](broker) Fix bug that heavy broker load may failed due to BrokerException which indicate the fd is not owned by client (#16350) 13cb81a724 is described below commit 13cb81a7243e634827ffbfcc9649ee3fa13192a1 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Fri Feb 3 15:06:45 2023 +0800 [fix](broker) Fix bug that heavy broker load may failed due to BrokerException which indicate the fd is not owned by client (#16350) Co-authored-by: caiconghui1 <caicongh...@jd.com> --- .../apache/doris/broker/hdfs/BrokerFileSystem.java | 2 +- .../doris/broker/hdfs/ClientContextManager.java | 23 +++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java index c8a217d205..22f6925d31 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java @@ -31,7 +31,7 @@ public class BrokerFileSystem { private ReentrantLock lock; private FileSystemIdentity identity; private FileSystem dfsFileSystem; - private long lastAccessTimestamp; + private volatile long lastAccessTimestamp; private long createTimestamp; private UUID fileSystemId; diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index e52f248e11..736f9ae448 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -131,7 +131,7 @@ public class ClientContextManager { public void run() { try { for (ClientResourceContext clientContext : clientContexts.values()) { - if (System.currentTimeMillis() - clientContext.lastPingTimestamp > clientExpirationSeconds * 1000) { + if (System.currentTimeMillis() - clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) { for (TBrokerFD fd : clientContext.inputStreams.keySet()) { ClientContextManager.this.removeInputStream(fd); } @@ -139,9 +139,9 @@ public class ClientContextManager { ClientContextManager.this.removeOutputStream(fd); } clientContexts.remove(clientContext.clientId); - logger.info("client [" + clientContext.clientId - + "] is expired, remove it from contexts. last ping time is " - + clientContext.lastPingTimestamp); + logger.info("client [" + clientContext.clientId + + "] is expired, remove it from contexts. last access time is " + + clientContext.lastAccessTimestamp); } } } finally { @@ -197,24 +197,28 @@ public class ClientContextManager { private String clientId; private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams; private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams; - private long lastPingTimestamp; + + private volatile long lastAccessTimestamp; public ClientResourceContext(String clientId) { this.clientId = clientId; this.inputStreams = new ConcurrentHashMap<>(); this.outputStreams = new ConcurrentHashMap<>(); - this.lastPingTimestamp = System.currentTimeMillis(); + this.lastAccessTimestamp = System.currentTimeMillis(); } public void putInputStream(TBrokerFD fd, FSDataInputStream inputStream, BrokerFileSystem fileSystem) { + updateLastAccessTime(); inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, fileSystem)); } public void putOutputStream(TBrokerFD fd, FSDataOutputStream outputStream, BrokerFileSystem fileSystem) { + updateLastAccessTime(); outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, fileSystem)); } public FSDataInputStream getInputStream(TBrokerFD fd) { + updateLastAccessTime(); BrokerInputStream brokerInputStream = inputStreams.get(fd); if (brokerInputStream != null) { return brokerInputStream.getInputStream(); @@ -223,6 +227,7 @@ public class ClientContextManager { } public FSDataOutputStream getOutputStream(TBrokerFD fd) { + updateLastAccessTime(); BrokerOutputStream brokerOutputStream = outputStreams.get(fd); if (brokerOutputStream != null) { return brokerOutputStream.getOutputStream(); @@ -230,8 +235,12 @@ public class ClientContextManager { return null; } + public void updateLastAccessTime() { + this.lastAccessTimestamp = System.currentTimeMillis(); + } + public void updateLastPingTime() { - this.lastPingTimestamp = System.currentTimeMillis(); + this.lastAccessTimestamp = System.currentTimeMillis(); // Should we also update the underline filesystem? maybe it is time cost for (BrokerInputStream brokerInputStream : inputStreams.values()) { brokerInputStream.updateLastUpdateAccessTime(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org