This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 67fe2c86dc5 [Feature](executor)Support ShowProcessStmt Show all Fe 
connection (#30907) (#30996)
67fe2c86dc5 is described below

commit 67fe2c86dc520d62946ccf6a8d37d14922e1466b
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Feb 8 20:01:38 2024 +0800

    [Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907) 
(#30996)
---
 .../apache/doris/analysis/ShowProcesslistStmt.java | 24 +++++++++-
 .../doris/common/proc/FrontendsProcNode.java       | 18 ++++++++
 .../doris/httpv2/controller/SessionController.java |  2 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  7 ++-
 .../java/org/apache/doris/qe/ConnectScheduler.java | 11 +++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 11 +++++
 .../java/org/apache/doris/qe/ShowExecutor.java     | 51 ++++++++++++++++++++--
 .../apache/doris/service/FrontendServiceImpl.java  | 16 +++++++
 .../org/apache/doris/qe/ConnectContextTest.java    |  2 +-
 gensrc/thrift/FrontendService.thrift               | 10 +++++
 10 files changed, 145 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
index 3b6c67b1bba..96e8a082249 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 
 // SHOW PROCESSLIST statement.
@@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt {
             .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
             .addColumn(new Column("Info", ScalarType.STRING)).build();
 
+    private static final ShowResultSetMetaData ALL_META_DATA = 
ShowResultSetMetaData.builder()
+            .addColumn(new Column("CurrentConnected", 
ScalarType.createVarchar(16)))
+            .addColumn(new Column("Id", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("User", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Host", ScalarType.createVarchar(16)))
+            .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Db", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Command", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Time", 
ScalarType.createType(PrimitiveType.INT)))
+            .addColumn(new Column("State", ScalarType.createVarchar(64)))
+            .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
+            .addColumn(new Column("Info", ScalarType.STRING))
+            .addColumn(new Column("FE", ScalarType.createVarchar(16))).build();
+
     private boolean isFull;
+    private boolean isShowAllFe;
 
     public ShowProcesslistStmt(boolean isFull) {
         this.isFull = isFull;
@@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt {
 
     @Override
     public void analyze(Analyzer analyzer) {
+        this.isShowAllFe = 
ConnectContext.get().getSessionVariable().getShowAllFeConnection();
+    }
+
+    public boolean isShowAllFe() {
+        return isShowAllFe;
     }
 
     @Override
@@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt {
 
     @Override
     public ShowResultSetMetaData getMetaData() {
-        return META_DATA;
+        return isShowAllFe ? ALL_META_DATA : META_DATA;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index a88315ee858..b5d2fa7d704 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.proc;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Frontend;
@@ -71,6 +72,23 @@ public class FrontendsProcNode implements ProcNodeInterface {
         return result;
     }
 
+    public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env, 
boolean includeSelf) {
+        List<Pair<String, Integer>> allFe = new ArrayList<>();
+        List<Frontend> frontends = env.getFrontends(null);
+
+        String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
+        if (ConnectContext.get() != null && 
!Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
+            selfNode = ConnectContext.get().getCurrentConnectedFEIp();
+        }
+
+        String finalSelfNode = selfNode;
+        frontends.stream()
+                .filter(fe -> (!fe.getHost().equals(finalSelfNode) || 
includeSelf))
+                .map(fe -> Pair.of(fe.getHost(), fe.getRpcPort()))
+                .forEach(allFe::add);
+        return allFe;
+    }
+
     public static void getFrontendsInfo(Env env, List<List<String>> infos) {
         InetSocketAddress master = null;
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 4f20fc8c9ef..10577421982 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -74,7 +74,7 @@ public class SessionController extends BaseController {
 
         long nowMs = System.currentTimeMillis();
         for (ConnectContext.ThreadInfo info : threadInfos) {
-            rows.add(info.toRow(-1, nowMs));
+            rows.add(info.toRow(-1, nowMs, false));
         }
 
         for (List<String> row : rows) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index aeb230557fd..6f9527f274b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -822,7 +822,7 @@ public class ConnectContext {
     public class ThreadInfo {
         public boolean isFull;
 
-        public List<String> toRow(int connId, long nowMs) {
+        public List<String> toRow(int connId, long nowMs, boolean showFe) {
             List<String> row = Lists.newArrayList();
             if (connId == connectionId) {
                 row.add("Yes");
@@ -850,6 +850,11 @@ public class ConnectContext {
             } else {
                 row.add("");
             }
+
+            if (showFe) {
+                row.add(Env.getCurrentEnv().getSelfNode().getHost());
+            }
+
             return row;
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index bbc31a32dc7..b8f2c64390f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
@@ -159,6 +160,16 @@ public class ConnectScheduler {
         return infos;
     }
 
+    // used for thrift
+    public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql, 
boolean isShowFeHost) {
+        List<List<String>> list = new ArrayList<>();
+        long nowMs = System.currentTimeMillis();
+        for (ConnectContext ctx : connectionMap.values()) {
+            list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, 
isShowFeHost));
+        }
+        return list;
+    }
+
     public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
         traceId2QueryId.put(traceId, queryId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 09fd164eb2d..f6f232fb92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -443,6 +443,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
+    public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -1331,6 +1333,11 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String IGNORE_RUNTIME_FILTER_IDS = 
"ignore_runtime_filter_ids";
 
+    @VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION,
+            description = {"when it's true show processlist statement list all 
fe's connection",
+                    "当变量为true时,show processlist命令展示所有fe的连接"})
+    public boolean showAllFeConnection = false;
+
     public Set<Integer> getIgnoredRuntimeFilterIds() {
         return Arrays.stream(ignoreRuntimeFilterIds.split(",[\\s]*"))
                 .map(v -> {
@@ -2847,5 +2854,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public void setForceJniScanner(boolean force) {
         forceJniScanner = force;
     }
+
+    public boolean getShowAllFeConnection() {
+        return this.showAllFeConnection;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index abbaed302c8..c110fdd3797 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -144,6 +144,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ConfigBase;
 import org.apache.doris.common.DdlException;
@@ -209,7 +210,11 @@ import org.apache.doris.task.AgentClient;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.GlobalTransactionMgr;
@@ -453,13 +458,53 @@ public class ShowExecutor {
     // Handle show processlist
     private void handleShowProcesslist() {
         ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
-        List<List<String>> rowSet = Lists.newArrayList();
+        boolean isShowFullSql = showStmt.isFull();
+        boolean isShowAllFe = showStmt.isShowAllFe();
 
+        List<List<String>> rowSet = Lists.newArrayList();
         List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
-                .listConnection(ctx.getQualifiedUser(), showStmt.isFull());
+                .listConnection(ctx.getQualifiedUser(), isShowFullSql);
         long nowMs = System.currentTimeMillis();
         for (ConnectContext.ThreadInfo info : threadInfos) {
-            rowSet.add(info.toRow(ctx.getConnectionId(), nowMs));
+            rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe));
+        }
+
+        if (isShowAllFe) {
+            try {
+                TShowProcessListRequest request = new 
TShowProcessListRequest();
+                request.setShowFullSql(isShowFullSql);
+                List<Pair<String, Integer>> frontends = 
FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
+                        false);
+                FrontendService.Client client = null;
+                for (Pair<String, Integer> fe : frontends) {
+                    TNetworkAddress thriftAddress = new 
TNetworkAddress(fe.key(), fe.value());
+                    try {
+                        client = 
ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to get frontend {} client. exception: 
{}", fe.key(), e);
+                        continue;
+                    }
+
+                    boolean isReturnToPool = false;
+                    try {
+                        TShowProcessListResult result = 
client.showProcessList(request);
+                        if (result.process_list != null && 
result.process_list.size() > 0) {
+                            rowSet.addAll(result.process_list);
+                        }
+                        isReturnToPool = true;
+                    } catch (Exception e) {
+                        LOG.warn("Failed to request processlist to fe: {} . 
exception: {}", fe.key(), e);
+                    } finally {
+                        if (isReturnToPool) {
+                            
ClientPool.frontendPool.returnObject(thriftAddress, client);
+                        } else {
+                            
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.warn(" fetch process list from other fe failed, ", t);
+            }
         }
 
         resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d70243804a6..d1cad65d49d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -172,6 +172,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
 import org.apache.doris.thrift.TRestoreSnapshotResult;
 import org.apache.doris.thrift.TRollbackTxnRequest;
 import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
 import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -3284,4 +3286,18 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw e;
         }
     }
+
+    @Override
+    public TShowProcessListResult showProcessList(TShowProcessListRequest 
request) {
+        boolean isShowFullSql = false;
+        if (request.isSetShowFullSql()) {
+            isShowFullSql = request.isShowFullSql();
+        }
+        List<List<String>> processList = 
ExecuteEnv.getInstance().getScheduler()
+                .listConnectionWithoutAuth(isShowFullSql, true);
+        TShowProcessListResult result = new TShowProcessListResult();
+        result.setProcessList(processList);
+        return result;
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index 85b60f53e3a..a15bdd8178a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -104,7 +104,7 @@ public class ConnectContextTest {
 
         // Thread info
         Assert.assertNotNull(ctx.toThreadInfo(false));
-        List<String> row = ctx.toThreadInfo(false).toRow(101, 1000);
+        List<String> row = ctx.toThreadInfo(false).toRow(101, 1000, false);
         Assert.assertEquals(12, row.size());
         Assert.assertEquals("Yes", row.get(0));
         Assert.assertEquals("101", row.get(1));
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 910a3763028..eaf8d0cd330 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1245,6 +1245,14 @@ struct TGetBackendMetaResult {
     3: optional Types.TNetworkAddress master_address
 }
 
+struct TShowProcessListRequest {
+    1: optional bool show_full_sql
+}
+
+struct TShowProcessListResult {
+    1: optional list<list<string>> process_list
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1318,4 +1326,6 @@ service FrontendService {
     TGetBackendMetaResult getBackendMeta(1: TGetBackendMetaRequest request)
 
     Status.TStatus invalidateStatsCache(1: 
TInvalidateFollowerStatsCacheRequest request)
+
+    TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to