This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 10aa1643e2e [opt](info) processlist schema table support show all fe 
(#38701)
10aa1643e2e is described below

commit 10aa1643e2edcc96da55228652a7966649b42eda
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Aug 5 18:26:16 2024 +0800

    [opt](info) processlist schema table support show all fe (#38701)
    
    ## Proposed changes
    query schema table processlist support show all fe
---
 be/src/exec/schema_scanner.h                         |  1 +
 .../schema_scanner/schema_processlist_scanner.cpp    | 11 ++++++++---
 be/src/pipeline/exec/schema_scan_operator.cpp        | 11 +++++++++++
 .../java/org/apache/doris/catalog/SchemaTable.java   | 17 ++++++++++++++++-
 .../org/apache/doris/planner/SchemaScanNode.java     | 20 ++++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                       |  1 +
 6 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 4666657af21..da61d58b943 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -69,6 +69,7 @@ struct SchemaScannerCommonParam {
     int32_t port;                                      // frontend thrift port
     int64_t thread_id;
     const std::string* catalog = nullptr;
+    std::set<TNetworkAddress> fe_addr_list;
 };
 
 // scanner parameter from frontend
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp 
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index caf3dcc5af9..185ef2ab442 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -56,9 +56,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
     TShowProcessListRequest request;
     request.__set_show_full_sql(true);
 
-    
RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip),
-                                                    
_param->common_param->port, request,
-                                                    &_process_list_result));
+    for (const auto& fe_addr : _param->common_param->fe_addr_list) {
+        TShowProcessListResult tmp_ret;
+        RETURN_IF_ERROR(
+                SchemaHelper::show_process_list(fe_addr.hostname, 
fe_addr.port, request, &tmp_ret));
+        
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
+                                                 tmp_ret.process_list.begin(),
+                                                 tmp_ret.process_list.end());
+    }
 
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index f1f3608aa20..8ff0a570e33 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -120,6 +120,17 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _common_scanner_param->catalog =
                 state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.catalog));
     }
+
+    if (tnode.schema_scan_node.__isset.fe_addr_list) {
+        for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) {
+            _common_scanner_param->fe_addr_list.insert(fe_addr);
+        }
+    } else if (tnode.schema_scan_node.__isset.ip && 
tnode.schema_scan_node.__isset.port) {
+        TNetworkAddress fe_addr;
+        fe_addr.hostname = tnode.schema_scan_node.ip;
+        fe_addr.port = tnode.schema_scan_node.port;
+        _common_scanner_param->fe_addr_list.insert(fe_addr);
+    }
     return Status::OK();
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 1c8f5dde930..8802d266526 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -508,7 +508,7 @@ public class SchemaTable extends Table {
                                     .column("INFO", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
                                     .column("FE",
                                             ScalarType.createVarchar(64))
-                                    .column("CLOUD_CLUSTER", 
ScalarType.createVarchar(64)).build()))
+                                    .column("CLOUD_CLUSTER", 
ScalarType.createVarchar(64)).build(), true))
             .put("workload_policy",
                     new SchemaTable(SystemIdGenerator.getNextId(), 
"workload_policy", TableType.SCHEMA,
                             builder().column("ID", 
ScalarType.createType(PrimitiveType.BIGINT))
@@ -542,10 +542,17 @@ public class SchemaTable extends Table {
             )
             .build();
 
+    private boolean fetchAllFe = false;
+
     protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema) {
         super(id, name, type, baseSchema);
     }
 
+    protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema, boolean fetchAllFe) {
+        this(id, name, type, baseSchema);
+        this.fetchAllFe = fetchAllFe;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         throw new UnsupportedOperationException("Do not allow to write 
SchemaTable to image.");
@@ -559,6 +566,14 @@ public class SchemaTable extends Table {
         return new Builder();
     }
 
+    public static boolean isShouldFetchAllFe(String schemaTableName) {
+        Table table = TABLE_MAP.get(schemaTableName);
+        if (table != null && table instanceof SchemaTable) {
+            return ((SchemaTable) table).fetchAllFe;
+        }
+        return false;
+    }
+
     /**
      * For TABLE_MAP.
      **/
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 3287277d0ef..741f765facd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.SchemaTable;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
@@ -27,6 +28,8 @@ import org.apache.doris.datasource.FederationBackendPolicy;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -38,6 +41,7 @@ import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -96,6 +100,21 @@ public class SchemaScanNode extends ScanNode {
         frontendPort = Config.rpc_port;
     }
 
+    private void setFeAddrList(TPlanNode msg) {
+        if (SchemaTable.isShouldFetchAllFe(tableName)) {
+            List<TNetworkAddress> feAddrList = new ArrayList();
+            if (ConnectContext.get().getSessionVariable().showAllFeConnection) 
{
+                List<Frontend> feList = Env.getCurrentEnv().getFrontends(null);
+                for (Frontend fe : feList) {
+                    feAddrList.add(new TNetworkAddress(fe.getHost(), 
fe.getRpcPort()));
+                }
+            } else {
+                feAddrList.add(new TNetworkAddress(frontendIP, frontendPort));
+            }
+            msg.schema_scan_node.setFeAddrList(feAddrList);
+        }
+    }
+
     @Override
     protected void toThrift(TPlanNode msg) {
         msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
@@ -128,6 +147,7 @@ public class SchemaScanNode extends ScanNode {
 
         TUserIdentity tCurrentUser = 
ConnectContext.get().getCurrentUserIdentity().toThrift();
         msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
+        setFeAddrList(msg);
     }
 
     @Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index cdc5e49decc..26d7983a468 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -698,6 +698,7 @@ struct TSchemaScanNode {
   12: optional bool show_hidden_cloumns = false
   // 13: optional list<TSchemaTableStructure> table_structure // deprecated
   14: optional string catalog
+  15: optional list<Types.TNetworkAddress> fe_addr_list
 }
 
 struct TMetaScanNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to