This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fc0222a64cb [opt](info) processlist schema table support show all fe 
(#38701) (#38953)
fc0222a64cb is described below

commit fc0222a64cb58415f05d913c724929b04fab476c
Author: wangbo <wan...@apache.org>
AuthorDate: Wed Aug 7 11:01:46 2024 +0800

    [opt](info) processlist schema table support show all fe (#38701) (#38953)
    
    pick #38701
---
 be/src/exec/schema_scanner.h                         |  1 +
 .../schema_scanner/schema_processlist_scanner.cpp    | 11 ++++++++---
 be/src/pipeline/exec/schema_scan_operator.cpp        | 11 +++++++++++
 .../java/org/apache/doris/catalog/SchemaTable.java   | 17 ++++++++++++++++-
 .../org/apache/doris/planner/SchemaScanNode.java     | 20 ++++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                       |  1 +
 6 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 4666657af21..da61d58b943 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -69,6 +69,7 @@ struct SchemaScannerCommonParam {
     int32_t port;                                      // frontend thrift port
     int64_t thread_id;
     const std::string* catalog = nullptr;
+    std::set<TNetworkAddress> fe_addr_list;
 };
 
 // scanner parameter from frontend
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp 
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index f1071359d0a..c65e1d14c2c 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -55,9 +55,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
     TShowProcessListRequest request;
     request.__set_show_full_sql(true);
 
-    
RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip),
-                                                    
_param->common_param->port, request,
-                                                    &_process_list_result));
+    for (const auto& fe_addr : _param->common_param->fe_addr_list) {
+        TShowProcessListResult tmp_ret;
+        RETURN_IF_ERROR(
+                SchemaHelper::show_process_list(fe_addr.hostname, 
fe_addr.port, request, &tmp_ret));
+        
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
+                                                 tmp_ret.process_list.begin(),
+                                                 tmp_ret.process_list.end());
+    }
 
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 8ff05cc41b7..d5353655ab0 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -133,6 +133,17 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _common_scanner_param->catalog =
                 state->obj_pool()->add(new 
std::string(tnode.schema_scan_node.catalog));
     }
+
+    if (tnode.schema_scan_node.__isset.fe_addr_list) {
+        for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) {
+            _common_scanner_param->fe_addr_list.insert(fe_addr);
+        }
+    } else if (tnode.schema_scan_node.__isset.ip && 
tnode.schema_scan_node.__isset.port) {
+        TNetworkAddress fe_addr;
+        fe_addr.hostname = tnode.schema_scan_node.ip;
+        fe_addr.port = tnode.schema_scan_node.port;
+        _common_scanner_param->fe_addr_list.insert(fe_addr);
+    }
     return Status::OK();
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 0995884dc6f..53b00b0880a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -504,7 +504,7 @@ public class SchemaTable extends Table {
                                     .column("QUERY_ID", 
ScalarType.createVarchar(256))
                                     .column("INFO", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
                                     .column("FE",
-                                            
ScalarType.createVarchar(64)).build()))
+                                            
ScalarType.createVarchar(64)).build(), true))
             .put("workload_policy",
                     new SchemaTable(SystemIdGenerator.getNextId(), 
"workload_policy", TableType.SCHEMA,
                             builder().column("ID", 
ScalarType.createType(PrimitiveType.BIGINT))
@@ -518,10 +518,17 @@ public class SchemaTable extends Table {
                                     .build()))
             .build();
 
+    private boolean fetchAllFe = false;
+
     protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema) {
         super(id, name, type, baseSchema);
     }
 
+    protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema, boolean fetchAllFe) {
+        this(id, name, type, baseSchema);
+        this.fetchAllFe = fetchAllFe;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         throw new UnsupportedOperationException("Do not allow to write 
SchemaTable to image.");
@@ -535,6 +542,14 @@ public class SchemaTable extends Table {
         return new Builder();
     }
 
+    public static boolean isShouldFetchAllFe(String schemaTableName) {
+        Table table = TABLE_MAP.get(schemaTableName);
+        if (table != null && table instanceof SchemaTable) {
+            return ((SchemaTable) table).fetchAllFe;
+        }
+        return false;
+    }
+
     /**
      * For TABLE_MAP.
      **/
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 5a52c79e953..9418f4f6cf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.SchemaTable;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
@@ -27,6 +28,8 @@ import org.apache.doris.datasource.FederationBackendPolicy;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -38,6 +41,7 @@ import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -84,6 +88,21 @@ public class SchemaScanNode extends ScanNode {
         frontendPort = Config.rpc_port;
     }
 
+    private void setFeAddrList(TPlanNode msg) {
+        if (SchemaTable.isShouldFetchAllFe(tableName)) {
+            List<TNetworkAddress> feAddrList = new ArrayList();
+            if (ConnectContext.get().getSessionVariable().showAllFeConnection) 
{
+                List<Frontend> feList = Env.getCurrentEnv().getFrontends(null);
+                for (Frontend fe : feList) {
+                    feAddrList.add(new TNetworkAddress(fe.getHost(), 
fe.getRpcPort()));
+                }
+            } else {
+                feAddrList.add(new TNetworkAddress(frontendIP, frontendPort));
+            }
+            msg.schema_scan_node.setFeAddrList(feAddrList);
+        }
+    }
+
     @Override
     protected void toThrift(TPlanNode msg) {
         msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
@@ -116,6 +135,7 @@ public class SchemaScanNode extends ScanNode {
 
         TUserIdentity tCurrentUser = 
ConnectContext.get().getCurrentUserIdentity().toThrift();
         msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
+        setFeAddrList(msg);
     }
 
     @Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f8a08dd708f..fc1a6e6baf5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -672,6 +672,7 @@ struct TSchemaScanNode {
   12: optional bool show_hidden_cloumns = false
   // 13: optional list<TSchemaTableStructure> table_structure // deprecated
   14: optional string catalog
+  15: optional list<Types.TNetworkAddress> fe_addr_list
 }
 
 struct TMetaScanNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to