This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13cb81a724 [fix](broker) Fix bug that heavy broker load may failed due 
to BrokerException which indicate the fd is not owned by client (#16350)
13cb81a724 is described below

commit 13cb81a7243e634827ffbfcc9649ee3fa13192a1
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Fri Feb 3 15:06:45 2023 +0800

    [fix](broker) Fix bug that heavy broker load may failed due to 
BrokerException which indicate the fd is not owned by client (#16350)
    
    Co-authored-by: caiconghui1 <caicongh...@jd.com>
---
 .../apache/doris/broker/hdfs/BrokerFileSystem.java |  2 +-
 .../doris/broker/hdfs/ClientContextManager.java    | 23 +++++++++++++++-------
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
index c8a217d205..22f6925d31 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
@@ -31,7 +31,7 @@ public class BrokerFileSystem {
     private ReentrantLock lock;
     private FileSystemIdentity identity;
     private FileSystem dfsFileSystem;
-    private long lastAccessTimestamp;
+    private volatile long lastAccessTimestamp;
     private long createTimestamp;
     private UUID fileSystemId;
 
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index e52f248e11..736f9ae448 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -131,7 +131,7 @@ public class ClientContextManager {
         public void run() {
             try {
                 for (ClientResourceContext clientContext : 
clientContexts.values()) {
-                    if (System.currentTimeMillis() - 
clientContext.lastPingTimestamp > clientExpirationSeconds * 1000) {
+                    if (System.currentTimeMillis() - 
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) {
                         for (TBrokerFD fd : 
clientContext.inputStreams.keySet()) {
                             ClientContextManager.this.removeInputStream(fd);
                         }
@@ -139,9 +139,9 @@ public class ClientContextManager {
                             ClientContextManager.this.removeOutputStream(fd);
                         }
                         clientContexts.remove(clientContext.clientId);
-                        logger.info("client [" + clientContext.clientId
-                            + "] is expired, remove it from contexts. last 
ping time is "
-                            + clientContext.lastPingTimestamp);
+                        logger.info("client [" + clientContext.clientId 
+                                + "] is expired, remove it from contexts. last 
access time is "
+                                + clientContext.lastAccessTimestamp);
                     }
                 }
             } finally {
@@ -197,24 +197,28 @@ public class ClientContextManager {
         private String clientId;
         private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams;
         private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams;
-        private long lastPingTimestamp;
+
+        private volatile long lastAccessTimestamp;
 
         public ClientResourceContext(String clientId) {
             this.clientId = clientId;
             this.inputStreams = new ConcurrentHashMap<>();
             this.outputStreams = new ConcurrentHashMap<>();
-            this.lastPingTimestamp = System.currentTimeMillis();
+            this.lastAccessTimestamp = System.currentTimeMillis();
         }
 
         public void putInputStream(TBrokerFD fd, FSDataInputStream 
inputStream, BrokerFileSystem fileSystem) {
+            updateLastAccessTime();
             inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream, 
fileSystem));
         }
 
         public void putOutputStream(TBrokerFD fd, FSDataOutputStream 
outputStream, BrokerFileSystem fileSystem) {
+            updateLastAccessTime();
             outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream, 
fileSystem));
         }
 
         public FSDataInputStream getInputStream(TBrokerFD fd) {
+            updateLastAccessTime();
             BrokerInputStream brokerInputStream = inputStreams.get(fd);
             if (brokerInputStream != null) {
                 return brokerInputStream.getInputStream();
@@ -223,6 +227,7 @@ public class ClientContextManager {
         }
 
         public FSDataOutputStream getOutputStream(TBrokerFD fd) {
+            updateLastAccessTime();
             BrokerOutputStream brokerOutputStream = outputStreams.get(fd);
             if (brokerOutputStream != null) {
                 return brokerOutputStream.getOutputStream();
@@ -230,8 +235,12 @@ public class ClientContextManager {
             return null;
         }
 
+        public void updateLastAccessTime() {
+            this.lastAccessTimestamp = System.currentTimeMillis();
+        }
+
         public void updateLastPingTime() {
-            this.lastPingTimestamp = System.currentTimeMillis();
+            this.lastAccessTimestamp = System.currentTimeMillis();
             // Should we also update the underline filesystem? maybe it is 
time cost
             for (BrokerInputStream brokerInputStream : inputStreams.values()) {
                 brokerInputStream.updateLastUpdateAccessTime();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to