This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 641e6546c2d [feature](regression-framework) print current pipeline 
tasks every mi… (#54850)
641e6546c2d is described below

commit 641e6546c2d914d618ebdff5cac4996e3a2409e4
Author: shuke <sh...@selectdb.com>
AuthorDate: Fri Aug 15 14:56:08 2025 +0800

    [feature](regression-framework) print current pipeline tasks every mi… 
(#54850)
    
    …nutes
---
 .../plugins/plugin_query_timeout_debugger.groovy   | 155 +++++++++++++++++++++
 1 file changed, 155 insertions(+)

diff --git a/regression-test/plugins/plugin_query_timeout_debugger.groovy 
b/regression-test/plugins/plugin_query_timeout_debugger.groovy
new file mode 100644
index 00000000000..5d863b9a754
--- /dev/null
+++ b/regression-test/plugins/plugin_query_timeout_debugger.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.DriverManager
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import org.apache.doris.regression.suite.Suite
+import org.apache.doris.regression.util.JdbcUtils
+import org.slf4j.Logger
+
+// make sure PluginQueryTimeoutDebugger quit gracefully
+class PluginQueryTimeoutDebuggerHolder {
+    static final PluginQueryTimeoutDebugger staticResource = new 
PluginQueryTimeoutDebugger()
+
+    static {
+        Runtime.runtime.addShutdownHook {
+            staticResource?.stopWorker()
+        }
+    }
+}
+
+PluginQueryTimeoutDebugger.jdbcUrl = context.config.jdbcUrl
+PluginQueryTimeoutDebugger.jdbcUser = context.config.jdbcUser
+PluginQueryTimeoutDebugger.jdbcPassword = context.config.jdbcPassword
+PluginQueryTimeoutDebugger.logger = logger
+PluginQueryTimeoutDebuggerHolder.staticResource.startWorker()
+
+/**
+ * print pipeline tasks every 1 minutes to help debugging query timeout.
+ * be list refreshed every 5 minutes.
+ */
+class PluginQueryTimeoutDebugger {
+    static private final String HostColumnName = "Host"
+    static private final String PortColumnName = "HttpPort"
+    static private final int HTTP_TIMEOUT = 5000
+    static private final long BACKEND_REFRESH_INTERVAL = 5 * 60 * 1000
+
+    static public String jdbcUrl
+    static public String jdbcUser
+    static public String jdbcPassword
+    static public Logger logger
+
+    private ScheduledExecutorService scheduler
+    private List<String> backendUrls = []
+    private long lastBackendRefreshTime = 0
+
+    // catch all exceptions in timer function.
+    private void startWorker() {
+        if (scheduler?.isShutdown() == false) {
+            logger.warn("worker already started")
+            return
+        }
+
+        scheduler = Executors.newSingleThreadScheduledExecutor { r ->
+            Thread thread = new Thread(r)
+            thread.setName("query-timeout-debugger-thread")
+            thread.setDaemon(true)
+            return thread
+        }
+        scheduler.scheduleAtFixedRate({
+            try {
+                work()
+            } catch (Exception e) {
+                logger.warn("work exception: ${e.getMessage()}", e)
+            }
+        }, 0, 1, TimeUnit.MINUTES)
+
+        logger.info("worker started with scheduler")
+    }
+
+    private void stopWorker() {
+        logger.info("stop worker")
+        if (scheduler != null) {
+            scheduler.shutdown()
+            if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
+                scheduler.shutdownNow()
+                logger.warn("worker scheduler forced to stop")
+            }
+        }
+    }
+
+    private void work() {
+        initBackendsUrls()
+        for (String url : backendUrls) {    
+            logger.info("${url} pipeline tasks: ${curl(url)}")
+        }
+    }
+
+    void initBackendsUrls() {
+        // refreshed every BACKEND_REFRESH_INTERVAL.
+        long now = System.currentTimeMillis()
+        if (!backendUrls.isEmpty() && (now - lastBackendRefreshTime < 
BACKEND_REFRESH_INTERVAL)) {
+            return
+        }
+        lastBackendRefreshTime = now
+
+        List<String> urls = []
+        DriverManager.getConnection(jdbcUrl, jdbcUser, 
jdbcPassword).withCloseable { conn ->
+            def (result, meta) = JdbcUtils.executeToList(conn, "show backends")
+            int hostIndex = -1
+            int portIndex = -1
+
+            for (int i = 0; i < meta.getColumnCount(); i++) {
+                if (meta.getColumnLabel(i+1) == HostColumnName) {
+                    hostIndex = i
+                } else if (meta.getColumnLabel(i+1) == PortColumnName) {
+                    portIndex = i
+                }
+            }
+
+            if (hostIndex != -1 && portIndex != -1) {
+                for (int i = 0; i < result.size(); i++) {
+                    
urls.add(String.format("http://%s:%s/api/running_pipeline_tasks/180";, 
result.get(i).get(hostIndex), result.get(i).get(portIndex)))
+                }
+                backendUrls = urls
+            }
+        }
+
+        logger.info("backends: ${backendUrls}")
+    }
+
+    String curl(String urlStr) {
+        HttpURLConnection connection = null
+        try {
+            URL url = new URL(urlStr)
+            connection = url.openConnection() as HttpURLConnection
+            connection.requestMethod = "GET"
+            connection.connectTimeout = HTTP_TIMEOUT
+            connection.readTimeout = HTTP_TIMEOUT
+            
+            if (connection.responseCode == 200) {
+                return connection.inputStream.text
+            } else {
+                throw new Exception("curl ${urlStr} failed, code: 
${connection.responseCode}")
+            }
+        } finally {
+            connection?.disconnect()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to