This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 67fe2c86dc5 [Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907) (#30996) 67fe2c86dc5 is described below commit 67fe2c86dc520d62946ccf6a8d37d14922e1466b Author: wangbo <wan...@apache.org> AuthorDate: Thu Feb 8 20:01:38 2024 +0800 [Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907) (#30996) --- .../apache/doris/analysis/ShowProcesslistStmt.java | 24 +++++++++- .../doris/common/proc/FrontendsProcNode.java | 18 ++++++++ .../doris/httpv2/controller/SessionController.java | 2 +- .../java/org/apache/doris/qe/ConnectContext.java | 7 ++- .../java/org/apache/doris/qe/ConnectScheduler.java | 11 +++++ .../java/org/apache/doris/qe/SessionVariable.java | 11 +++++ .../java/org/apache/doris/qe/ShowExecutor.java | 51 ++++++++++++++++++++-- .../apache/doris/service/FrontendServiceImpl.java | 16 +++++++ .../org/apache/doris/qe/ConnectContextTest.java | 2 +- gensrc/thrift/FrontendService.thrift | 10 +++++ 10 files changed, 145 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java index 3b6c67b1bba..96e8a082249 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; // SHOW PROCESSLIST statement. @@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt { .addColumn(new Column("QueryId", ScalarType.createVarchar(64))) .addColumn(new Column("Info", ScalarType.STRING)).build(); + private static final ShowResultSetMetaData ALL_META_DATA = ShowResultSetMetaData.builder() + .addColumn(new Column("CurrentConnected", ScalarType.createVarchar(16))) + .addColumn(new Column("Id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("User", ScalarType.createVarchar(16))) + .addColumn(new Column("Host", ScalarType.createVarchar(16))) + .addColumn(new Column("LoginTime", ScalarType.createVarchar(16))) + .addColumn(new Column("Catalog", ScalarType.createVarchar(16))) + .addColumn(new Column("Db", ScalarType.createVarchar(16))) + .addColumn(new Column("Command", ScalarType.createVarchar(16))) + .addColumn(new Column("Time", ScalarType.createType(PrimitiveType.INT))) + .addColumn(new Column("State", ScalarType.createVarchar(64))) + .addColumn(new Column("QueryId", ScalarType.createVarchar(64))) + .addColumn(new Column("Info", ScalarType.STRING)) + .addColumn(new Column("FE", ScalarType.createVarchar(16))).build(); + private boolean isFull; + private boolean isShowAllFe; public ShowProcesslistStmt(boolean isFull) { this.isFull = isFull; @@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt { @Override public void analyze(Analyzer analyzer) { + this.isShowAllFe = ConnectContext.get().getSessionVariable().getShowAllFeConnection(); + } + + public boolean isShowAllFe() { + return isShowAllFe; } @Override @@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt { @Override public ShowResultSetMetaData getMetaData() { - return META_DATA; + return isShowAllFe ? ALL_META_DATA : META_DATA; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index a88315ee858..b5d2fa7d704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -19,6 +19,7 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Frontend; @@ -71,6 +72,23 @@ public class FrontendsProcNode implements ProcNodeInterface { return result; } + public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env, boolean includeSelf) { + List<Pair<String, Integer>> allFe = new ArrayList<>(); + List<Frontend> frontends = env.getFrontends(null); + + String selfNode = Env.getCurrentEnv().getSelfNode().getHost(); + if (ConnectContext.get() != null && !Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) { + selfNode = ConnectContext.get().getCurrentConnectedFEIp(); + } + + String finalSelfNode = selfNode; + frontends.stream() + .filter(fe -> (!fe.getHost().equals(finalSelfNode) || includeSelf)) + .map(fe -> Pair.of(fe.getHost(), fe.getRpcPort())) + .forEach(allFe::add); + return allFe; + } + public static void getFrontendsInfo(Env env, List<List<String>> infos) { InetSocketAddress master = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java index 4f20fc8c9ef..10577421982 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java @@ -74,7 +74,7 @@ public class SessionController extends BaseController { long nowMs = System.currentTimeMillis(); for (ConnectContext.ThreadInfo info : threadInfos) { - rows.add(info.toRow(-1, nowMs)); + rows.add(info.toRow(-1, nowMs, false)); } for (List<String> row : rows) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index aeb230557fd..6f9527f274b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -822,7 +822,7 @@ public class ConnectContext { public class ThreadInfo { public boolean isFull; - public List<String> toRow(int connId, long nowMs) { + public List<String> toRow(int connId, long nowMs, boolean showFe) { List<String> row = Lists.newArrayList(); if (connId == connectionId) { row.add("Yes"); @@ -850,6 +850,11 @@ public class ConnectContext { } else { row.add(""); } + + if (showFe) { + row.add(Env.getCurrentEnv().getSelfNode().getHost()); + } + return row; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index bbc31a32dc7..b8f2c64390f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TimerTask; @@ -159,6 +160,16 @@ public class ConnectScheduler { return infos; } + // used for thrift + public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql, boolean isShowFeHost) { + List<List<String>> list = new ArrayList<>(); + long nowMs = System.currentTimeMillis(); + for (ConnectContext ctx : connectionMap.values()) { + list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, isShowFeHost)); + } + return list; + } + public void putTraceId2QueryId(String traceId, TUniqueId queryId) { traceId2QueryId.put(traceId, queryId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 09fd164eb2d..f6f232fb92e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -443,6 +443,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FORCE_JNI_SCANNER = "force_jni_scanner"; + public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -1331,6 +1333,11 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; + @VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION, + description = {"when it's true show processlist statement list all fe's connection", + "当变量为true时,show processlist命令展示所有fe的连接"}) + public boolean showAllFeConnection = false; + public Set<Integer> getIgnoredRuntimeFilterIds() { return Arrays.stream(ignoreRuntimeFilterIds.split(",[\\s]*")) .map(v -> { @@ -2847,5 +2854,9 @@ public class SessionVariable implements Serializable, Writable { public void setForceJniScanner(boolean force) { forceJniScanner = force; } + + public boolean getShowAllFeConnection() { + return this.showAllFeConnection; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index abbaed302c8..c110fdd3797 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -144,6 +144,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase; import org.apache.doris.common.DdlException; @@ -209,7 +210,11 @@ import org.apache.doris.task.AgentClient; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.SnapshotTask; +import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TCheckStorageFormatResult; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TShowProcessListRequest; +import org.apache.doris.thrift.TShowProcessListResult; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUnit; import org.apache.doris.transaction.GlobalTransactionMgr; @@ -453,13 +458,53 @@ public class ShowExecutor { // Handle show processlist private void handleShowProcesslist() { ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt; - List<List<String>> rowSet = Lists.newArrayList(); + boolean isShowFullSql = showStmt.isFull(); + boolean isShowAllFe = showStmt.isShowAllFe(); + List<List<String>> rowSet = Lists.newArrayList(); List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler() - .listConnection(ctx.getQualifiedUser(), showStmt.isFull()); + .listConnection(ctx.getQualifiedUser(), isShowFullSql); long nowMs = System.currentTimeMillis(); for (ConnectContext.ThreadInfo info : threadInfos) { - rowSet.add(info.toRow(ctx.getConnectionId(), nowMs)); + rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe)); + } + + if (isShowAllFe) { + try { + TShowProcessListRequest request = new TShowProcessListRequest(); + request.setShowFullSql(isShowFullSql); + List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(), + false); + FrontendService.Client client = null; + for (Pair<String, Integer> fe : frontends) { + TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value()); + try { + client = ClientPool.frontendPool.borrowObject(thriftAddress, 3000); + } catch (Exception e) { + LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e); + continue; + } + + boolean isReturnToPool = false; + try { + TShowProcessListResult result = client.showProcessList(request); + if (result.process_list != null && result.process_list.size() > 0) { + rowSet.addAll(result.process_list); + } + isReturnToPool = true; + } catch (Exception e) { + LOG.warn("Failed to request processlist to fe: {} . exception: {}", fe.key(), e); + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + } catch (Throwable t) { + LOG.warn(" fetch process list from other fe failed, ", t); + } } resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d70243804a6..d1cad65d49d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -172,6 +172,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest; import org.apache.doris.thrift.TRestoreSnapshotResult; import org.apache.doris.thrift.TRollbackTxnRequest; import org.apache.doris.thrift.TRollbackTxnResult; +import org.apache.doris.thrift.TShowProcessListRequest; +import org.apache.doris.thrift.TShowProcessListResult; import org.apache.doris.thrift.TShowVariableRequest; import org.apache.doris.thrift.TShowVariableResult; import org.apache.doris.thrift.TSnapshotLoaderReportRequest; @@ -3284,4 +3286,18 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw e; } } + + @Override + public TShowProcessListResult showProcessList(TShowProcessListRequest request) { + boolean isShowFullSql = false; + if (request.isSetShowFullSql()) { + isShowFullSql = request.isShowFullSql(); + } + List<List<String>> processList = ExecuteEnv.getInstance().getScheduler() + .listConnectionWithoutAuth(isShowFullSql, true); + TShowProcessListResult result = new TShowProcessListResult(); + result.setProcessList(processList); + return result; + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index 85b60f53e3a..a15bdd8178a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -104,7 +104,7 @@ public class ConnectContextTest { // Thread info Assert.assertNotNull(ctx.toThreadInfo(false)); - List<String> row = ctx.toThreadInfo(false).toRow(101, 1000); + List<String> row = ctx.toThreadInfo(false).toRow(101, 1000, false); Assert.assertEquals(12, row.size()); Assert.assertEquals("Yes", row.get(0)); Assert.assertEquals("101", row.get(1)); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 910a3763028..eaf8d0cd330 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1245,6 +1245,14 @@ struct TGetBackendMetaResult { 3: optional Types.TNetworkAddress master_address } +struct TShowProcessListRequest { + 1: optional bool show_full_sql +} + +struct TShowProcessListResult { + 1: optional list<list<string>> process_list +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1318,4 +1326,6 @@ service FrontendService { TGetBackendMetaResult getBackendMeta(1: TGetBackendMetaRequest request) Status.TStatus invalidateStatsCache(1: TInvalidateFollowerStatsCacheRequest request) + + TShowProcessListResult showProcessList(1: TShowProcessListRequest request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org