This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d5c43c51b5a [branch-2.1](fe) Avoid interrupt daemon thread and use 
proper polling interval (#42210) (#42646)
d5c43c51b5a is described below

commit d5c43c51b5a50efd8f5ea14cff540fdd1383da72
Author: zclllhhjj <zhaochan...@selectdb.com>
AuthorDate: Tue Oct 29 10:10:26 2024 +0800

    [branch-2.1](fe) Avoid interrupt daemon thread and use proper polling 
interval (#42210) (#42646)
    
    pick https://github.com/apache/doris/pull/42210
---
 .../main/java/org/apache/doris/catalog/Env.java    |  3 +-
 .../doris/clone/DynamicPartitionScheduler.java     | 45 ++++++++++++++++++++++
 .../java/org/apache/doris/common/util/Daemon.java  | 11 ++++--
 3 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f60f4634c0b..0d9ad091fcb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5579,8 +5579,9 @@ public class Env {
         ConfigBase.setMutableConfig(key, value);
         if (configtoThreads.get(key) != null) {
             try {
+                // not atomic. maybe delay to aware. but acceptable.
                 
configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) * 
1000L);
-                configtoThreads.get(key).get().interrupt();
+                // shouldn't interrupt to keep possible bdbje writing safe.
                 LOG.info("set config " + key + " to " + value);
             } catch (IllegalAccessException e) {
                 LOG.warn("set config " + key + " failed: " + e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 1b00f041964..51dc7dd802f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.base.Strings;
@@ -88,6 +89,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
 
     private static final String DEFAULT_RUNTIME_VALUE = 
FeConstants.null_string;
 
+    private static final long SLEEP_PIECE = 5000L;
+
     private Map<Long, Map<String, String>> runtimeInfos = 
Maps.newConcurrentMap();
     private Set<Pair<Long, Long>> dynamicPartitionTableInfo = 
Sets.newConcurrentHashSet();
     private boolean initialize;
@@ -663,6 +666,48 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
         initialize = true;
     }
 
+    // specialized schedule logic. split sleep to many small pieces. so if 
interval changed, it won't take too much
+    // time to aware.
+    @Override
+    public void run() {
+        if (metaContext != null) {
+            metaContext.setThreadLocalInfo();
+        }
+
+        while (!isStop.get()) {
+            try {
+                runOneCycle();
+            } catch (Throwable e) {
+                LOG.error("daemon thread got exception. name: {}", getName(), 
e);
+            }
+
+            try {
+                long oldInterval = intervalMs;
+                long remainingInterval = oldInterval;
+                while (remainingInterval > SLEEP_PIECE) {
+                    // if it changed. let it know at most 10 seconds. and 5 
second per wakeup is acceptable.
+                    if (intervalMs != oldInterval) { // changed
+                        break;
+                    }
+
+                    Thread.sleep(SLEEP_PIECE);
+                    remainingInterval -= SLEEP_PIECE;
+                }
+                if (remainingInterval <= SLEEP_PIECE) {
+                    Thread.sleep(remainingInterval);
+                }
+            } catch (InterruptedException e) {
+                // This thread should NEVER be interrupted. or meet bdbje 
writing, it will be disaster.
+                LOG.fatal("InterruptedException: ", e);
+            }
+        }
+
+        if (metaContext != null) {
+            MetaContext.remove();
+        }
+        LOG.error("daemon thread exits. name=" + this.getName());
+    }
+
     @Override
     protected void runAfterCatalogReady() {
         if (!initialize) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
index 472285b4764..4678f78d668 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java
@@ -28,12 +28,15 @@ public class Daemon extends Thread {
     private static final Logger LOG = LogManager.getLogger(Daemon.class);
     private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
 
-    private long intervalMs;
-    private AtomicBoolean isStop;
+    protected long intervalMs;
+
+    protected AtomicBoolean isStop;
+
+    protected MetaContext metaContext = null;
+
     private Runnable runnable;
-    private AtomicBoolean isStart = new AtomicBoolean(false);
 
-    private MetaContext metaContext = null;
+    private AtomicBoolean isStart = new AtomicBoolean(false);
 
     {
         setDaemon(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to