This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bcc5ce569d [Improvement]Add schema table backend_active_tasks (#31945)
9bcc5ce569d is described below

commit 9bcc5ce569df0fc49fae792facc82b8ceb6f0f83
Author: wangbo <wan...@apache.org>
AuthorDate: Sat Mar 9 10:19:07 2024 +0800

    [Improvement]Add schema table backend_active_tasks (#31945)
---
 be/src/exec/schema_scanner.cpp                     |  3 +
 .../schema_scanner/schema_backend_active_tasks.cpp | 94 ++++++++++++++++++++++
 .../schema_scanner/schema_backend_active_tasks.h   | 49 +++++++++++
 be/src/runtime/runtime_query_statistics_mgr.cpp    | 81 +++++++++----------
 be/src/runtime/runtime_query_statistics_mgr.h      |  8 +-
 be/src/vec/exec/scan/vmeta_scanner.cpp             | 22 +----
 be/src/vec/exec/scan/vmeta_scanner.h               |  1 -
 .../org/apache/doris/analysis/SchemaTableType.java |  3 +-
 .../doris/catalog/BuiltinTableValuedFunctions.java |  4 +-
 .../java/org/apache/doris/catalog/SchemaTable.java | 14 ++++
 .../expressions/functions/table/ActiveBeTasks.java | 58 -------------
 .../visitor/TableValuedFunctionVisitor.java        |  5 --
 .../planner/BackendPartitionedSchemaScanNode.java  | 12 ++-
 .../ActiveBeTasksTableValuedFunction.java          | 76 -----------------
 .../doris/tablefunction/TableValuedFunctionIf.java |  2 -
 .../doris/datasource/RefreshCatalogTest.java       |  4 +-
 gensrc/thrift/Descriptors.thrift                   |  3 +-
 gensrc/thrift/Types.thrift                         |  3 +-
 .../jdbc/test_mariadb_jdbc_catalog.out             |  1 +
 .../jdbc/test_mysql_jdbc_catalog.out               |  1 +
 .../jdbc/test_mysql_jdbc_catalog_nereids.out       |  1 +
 .../meta_scan/test_backend_active_tasks.groovy     | 43 ++++++++++
 22 files changed, 271 insertions(+), 217 deletions(-)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index b700d36f209..bff59130e8b 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <utility>
 
+#include "exec/schema_scanner/schema_backend_active_tasks.h"
 #include "exec/schema_scanner/schema_charsets_scanner.h"
 #include "exec/schema_scanner/schema_collations_scanner.h"
 #include "exec/schema_scanner/schema_columns_scanner.h"
@@ -149,6 +150,8 @@ std::unique_ptr<SchemaScanner> 
SchemaScanner::create(TSchemaTableType::type type
         return SchemaMetadataNameIdsScanner::create_unique();
     case TSchemaTableType::SCH_PROFILING:
         return SchemaProfilingScanner::create_unique();
+    case TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS:
+        return SchemaBackendActiveTasksScanner::create_unique();
     default:
         return SchemaDummyScanner::create_unique();
         break;
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
new file mode 100644
index 00000000000..c5f8825c2e4
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_backend_active_tasks.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/runtime_state.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+std::vector<SchemaScanner::ColumnDesc> 
SchemaBackendActiveTasksScanner::_s_tbls_columns = {
+        //   name,       type,          size
+        {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false},
+        {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
+        {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
+        {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+        {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+        {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
+};
+
+SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
+        : SchemaScanner(_s_tbls_columns, 
TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS) {}
+
+SchemaBackendActiveTasksScanner::~SchemaBackendActiveTasksScanner() {}
+
+Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
+    _block_rows_limit = state->batch_size();
+    return Status::OK();
+}
+
+Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* 
block, bool* eos) {
+    if (!_is_init) {
+        return Status::InternalError("Used before initialized.");
+    }
+
+    if (nullptr == block || nullptr == eos) {
+        return Status::InternalError("input pointer is nullptr.");
+    }
+
+    if (_task_stats_block == nullptr) {
+        _task_stats_block = vectorized::Block::create_unique();
+
+        for (int i = 0; i < _s_tbls_columns.size(); ++i) {
+            TypeDescriptor descriptor(_s_tbls_columns[i].type);
+            auto data_type =
+                    
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+            _task_stats_block->insert(vectorized::ColumnWithTypeAndName(
+                    data_type->create_column(), data_type, 
_s_tbls_columns[i].name));
+        }
+
+        _task_stats_block->reserve(_block_rows_limit);
+
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block(
+                _task_stats_block.get());
+        _total_rows = _task_stats_block->rows();
+    }
+
+    if (_row_idx == _total_rows) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
+    vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
+    mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows);
+    _row_idx += current_batch_rows;
+
+    *eos = _row_idx == _total_rows;
+    return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
new file mode 100644
index 00000000000..d8a2a1ffa3f
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaBackendActiveTasksScanner : public SchemaScanner {
+    ENABLE_FACTORY_CREATOR(SchemaBackendActiveTasksScanner);
+
+public:
+    SchemaBackendActiveTasksScanner();
+    ~SchemaBackendActiveTasksScanner() override;
+
+    Status start(RuntimeState* state) override;
+    Status get_next_block(vectorized::Block* block, bool* eos) override;
+
+    static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+
+private:
+    int _block_rows_limit = 4096;
+    int _row_idx = 0;
+    int _total_rows = 0;
+    std::unique_ptr<vectorized::Block> _task_stats_block = nullptr;
+};
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index ee09b0c30dc..9764b0f0507 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -21,6 +21,7 @@
 #include "runtime/exec_env.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "vec/core/block.h"
 
 namespace doris {
 
@@ -199,54 +200,52 @@ void 
RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64
     }
 }
 
-std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics(
-        std::vector<std::string> filter_columns) {
+void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* 
block) {
     std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
-    std::vector<TRow> table_rows;
     int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
 
+    auto insert_int_value = [&](int col_index, int64_t int_val, 
vectorized::Block* block) {
+        vectorized::MutableColumnPtr mutable_col_ptr;
+        mutable_col_ptr = 
std::move(*block->get_by_position(col_index).column).assume_mutable();
+        auto* nullable_column =
+                
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+        vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
+                int_val);
+        nullable_column->get_null_map_data().emplace_back(0);
+    };
+
+    auto insert_string_value = [&](int col_index, std::string str_val, 
vectorized::Block* block) {
+        vectorized::MutableColumnPtr mutable_col_ptr;
+        mutable_col_ptr = 
std::move(*block->get_by_position(col_index).column).assume_mutable();
+        auto* nullable_column =
+                
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+        vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+        
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
+                                                                          
str_val.size());
+        nullable_column->get_null_map_data().emplace_back(0);
+    };
+
+    // block's schema come from 
SchemaBackendActiveTasksScanner::_s_tbls_columns
     for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
-        TRow trow;
-
         TQueryStatistics tqs;
         qs_ctx_ptr->collect_query_statistics(&tqs);
-
-        for (auto iter = filter_columns.begin(); iter != filter_columns.end(); 
iter++) {
-            std::string col_name = *iter;
-
-            TCell tcell;
-            if (col_name == "beid") {
-                tcell.longVal = be_id;
-            } else if (col_name == "fehost") {
-                tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname;
-            } else if (col_name == "queryid") {
-                tcell.stringVal = query_id;
-            } else if (col_name == "tasktimems") {
-                if (qs_ctx_ptr->_is_query_finished) {
-                    tcell.longVal = qs_ctx_ptr->_query_finish_time - 
qs_ctx_ptr->_query_start_time;
-                } else {
-                    tcell.longVal = MonotonicMillis() - 
qs_ctx_ptr->_query_start_time;
-                }
-            } else if (col_name == "taskcputimems") {
-                tcell.longVal = tqs.cpu_ms;
-            } else if (col_name == "scanrows") {
-                tcell.longVal = tqs.scan_rows;
-            } else if (col_name == "scanbytes") {
-                tcell.longVal = tqs.scan_bytes;
-            } else if (col_name == "bepeakmemorybytes") {
-                tcell.longVal = tqs.max_peak_memory_bytes;
-            } else if (col_name == "currentusedmemorybytes") {
-                tcell.longVal = tqs.current_used_memory_bytes;
-            } else if (col_name == "shufflesendbytes") {
-                tcell.longVal = tqs.shuffle_send_bytes;
-            } else if (col_name == "shufflesendRows") {
-                tcell.longVal = tqs.shuffle_send_rows;
-            }
-            trow.column_value.push_back(tcell);
-        }
-        table_rows.push_back(trow);
+        insert_int_value(0, be_id, block);
+        insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block);
+        insert_string_value(2, query_id, block);
+
+        int64_t task_time = qs_ctx_ptr->_is_query_finished
+                                    ? qs_ctx_ptr->_query_finish_time - 
qs_ctx_ptr->_query_start_time
+                                    : MonotonicMillis() - 
qs_ctx_ptr->_query_start_time;
+        insert_int_value(3, task_time, block);
+        insert_int_value(4, tqs.cpu_ms, block);
+        insert_int_value(5, tqs.scan_rows, block);
+        insert_int_value(6, tqs.scan_bytes, block);
+        insert_int_value(7, tqs.max_peak_memory_bytes, block);
+        insert_int_value(8, tqs.current_used_memory_bytes, block);
+        insert_int_value(9, tqs.shuffle_send_bytes, block);
+        insert_int_value(10, tqs.shuffle_send_rows, block);
     }
-    return table_rows;
 }
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index 44badd196a3..1b3e164d48f 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -28,6 +28,10 @@
 
 namespace doris {
 
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
 class QueryStatisticsCtx {
 public:
     QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
@@ -68,8 +72,8 @@ public:
     void get_metric_map(std::string query_id,
                         std::map<WorkloadMetricType, std::string>& metric_map);
 
-    // used for tvf active_queries
-    std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string> 
filter_columns);
+    // used for backend_active_tasks
+    void get_active_be_tasks_block(vectorized::Block* block);
 
 private:
     std::shared_mutex _qs_ctx_map_lock;
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 7d438366cf2..22545fa4dce 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -34,7 +34,6 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
-#include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/thrift_rpc_helper.h"
@@ -96,12 +95,7 @@ Status VMetaScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conju
         return Status::InternalError("Logical error, VMetaScanner do not allow 
ColumnNullable");
     }
 
-    if (_scan_range.meta_scan_range.metadata_type == 
TMetadataType::ACTIVE_BE_TASKS) {
-        // tvf active_be_tasks fetch data in be directly, it does not need to 
request FE for data
-        RETURN_IF_ERROR(_build_active_be_tasks_data());
-    } else {
-        RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
-    }
+    RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
     return Status::OK();
 }
 
@@ -294,20 +288,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     return Status::OK();
 }
 
-Status VMetaScanner::_build_active_be_tasks_data() {
-    std::vector<std::string> filter_columns;
-    for (const auto& slot : _tuple_desc->slots()) {
-        filter_columns.emplace_back(slot->col_name_lower_case());
-    }
-
-    std::vector<TRow> ret =
-            
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics(
-                    filter_columns);
-    _batch_data = std::move(ret);
-
-    return Status::OK();
-}
-
 Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                      
TFetchSchemaTableDataRequest* request) {
     VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 25cb9345311..518f42ffc1c 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -91,7 +91,6 @@ private:
                                          TFetchSchemaTableDataRequest* 
request);
     Status _build_queries_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                            TFetchSchemaTableDataRequest* 
request);
-    Status _build_active_be_tasks_data();
 
     bool _meta_eos;
     TupleId _tuple_id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 59bae154d57..7a7d547e240 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -69,7 +69,8 @@ public enum SchemaTableType {
     SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS),
     SCH_PARAMETERS("PARAMETERS", "PARAMETERS", 
TSchemaTableType.SCH_PARAMETERS),
     SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", 
TSchemaTableType.SCH_METADATA_NAME_IDS),
-    SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING);
+    SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING),
+    SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", 
TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS);
     private static final String dbName = "INFORMATION_SCHEMA";
     private static SelectList fullSelectLists;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index acdeb683f26..ac1b31fceac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -60,8 +59,7 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(MvInfos.class, "mv_infos"),
             tableValued(Jobs.class, "jobs"),
             tableValued(Tasks.class, "tasks"),
-            tableValued(WorkloadGroups.class, "workload_groups"),
-            tableValued(ActiveBeTasks.class, "active_be_tasks")
+            tableValued(WorkloadGroups.class, "workload_groups")
     );
 
     public static final BuiltinTableValuedFunctions INSTANCE = new 
BuiltinTableValuedFunctions();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 7215cf0fc76..9e068418610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -444,6 +444,20 @@ public class SchemaTable extends Table {
                             .column("SOURCE_FILE", 
ScalarType.createVarchar(20))
                             .column("SOURCE_LINE", 
ScalarType.createType(PrimitiveType.INT))
                             .build()))
+            .put("backend_active_tasks",
+                    new SchemaTable(SystemIdGenerator.getNextId(), 
"backend_active_tasks", TableType.SCHEMA,
+                            builder().column("BE_ID", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("FE_HOST", 
ScalarType.createVarchar(256))
+                                    .column("QUERY_ID", 
ScalarType.createVarchar(256))
+                                    .column("TASK_TIME_MS", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("TASK_CPU_TIME_MS", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("SCAN_ROWS", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("SCAN_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("BE_PEAK_MEMORY_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("CURRENT_USED_MEMORY_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("SHUFFLE_SEND_BYTES", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .column("SHUFFLE_SEND_ROWS", 
ScalarType.createType(PrimitiveType.BIGINT))
+                                    .build()))
             .build();
 
     protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
deleted file mode 100644
index 5737f52a2b9..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.expressions.functions.table;
-
-import org.apache.doris.catalog.FunctionSignature;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
-import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction;
-import org.apache.doris.tablefunction.TableValuedFunctionIf;
-
-import java.util.Map;
-
-/**
- *  stands be running tasks status, currently main including 
select/streamload/broker load/insert select
- */
-public class ActiveBeTasks extends TableValuedFunction {
-
-    public ActiveBeTasks(Properties properties) {
-        super("active_be_tasks", properties);
-    }
-
-    @Override
-    public FunctionSignature customSignature() {
-        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
-    }
-
-    @Override
-    protected TableValuedFunctionIf toCatalogFunction() {
-        try {
-            Map<String, String> arguments = getTVFProperties().getMap();
-            return new ActiveBeTasksTableValuedFunction(arguments);
-        } catch (Throwable t) {
-            throw new AnalysisException("Can not build 
ActiveBeTasksTableValuedFunction by "
-                    + this + ": " + t.getMessage(), t);
-        }
-    }
-
-    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
-        return visitor.visitActiveBeTasks(this, context);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 36561e5b12c..fba34d48168 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.nereids.trees.expressions.visitor;
 
-import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
@@ -103,8 +102,4 @@ public interface TableValuedFunctionVisitor<R, C> {
     default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
         return visitTableValuedFunction(workloadGroups, context);
     }
-
-    default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) {
-        return visitTableValuedFunction(beTasks, context);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index 592fc3c96cf..dc57b67d98a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -41,8 +41,10 @@ import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The BackendSchemaScanNode used for those SchemaTable which data are need to 
acquire from backends.
@@ -51,10 +53,16 @@ import java.util.Map;
  * So, we can use partitionInfo to select the necessary `be` to send query.
  */
 public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
-    public static final String ROWSETS = "rowsets";
+
+    public static final Set<String> BACKEND_TABLE = new HashSet<>();
+
+    static {
+        BACKEND_TABLE.add("rowsets");
+        BACKEND_TABLE.add("backend_active_tasks");
+    }
 
     public static boolean isBackendPartitionedSchemaTable(String tableName) {
-        if (tableName.equalsIgnoreCase(ROWSETS)) {
+        if (BACKEND_TABLE.contains(tableName.toLowerCase())) {
             return true;
         }
         return false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
deleted file mode 100644
index 99a8ba4886f..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.tablefunction;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.thrift.TMetaScanRange;
-import org.apache.doris.thrift.TMetadataType;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-import java.util.Map;
-
-public class ActiveBeTasksTableValuedFunction extends 
MetadataTableValuedFunction {
-
-    public static final String NAME = "active_be_tasks";
-
-    private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
-            new Column("BeId", PrimitiveType.BIGINT),
-            new Column("FeHost", ScalarType.createStringType()),
-            new Column("QueryId", ScalarType.createStringType()),
-            new Column("TaskTimeMs", PrimitiveType.BIGINT),
-            new Column("TaskCpuTimeMs", PrimitiveType.BIGINT),
-            new Column("ScanRows", PrimitiveType.BIGINT),
-            new Column("ScanBytes", PrimitiveType.BIGINT),
-            new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
-            new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
-            new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
-            new Column("ShuffleSendRows", PrimitiveType.BIGINT));
-
-    public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
-        if (params.size() != 0) {
-            throw new AnalysisException("ActiveBeTasks table-valued-function 
does not support any params");
-        }
-    }
-
-    @Override
-    public TMetadataType getMetadataType() {
-        return TMetadataType.ACTIVE_BE_TASKS;
-    }
-
-    @Override
-    public TMetaScanRange getMetaScanRange() {
-        TMetaScanRange metaScanRange = new TMetaScanRange();
-        metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS);
-        return metaScanRange;
-    }
-
-    @Override
-    public String getTableName() {
-        return "ActiveBeTasksTableValuedFunction";
-    }
-
-    @Override
-    public List<Column> getTableColumns() throws AnalysisException {
-        return SCHEMA;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 4b755a97bf8..f9fb76a9666 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -78,8 +78,6 @@ public abstract class TableValuedFunctionIf {
                 return new ActiveQueriesTableValuedFunction(params);
             case WorkloadSchedPolicyTableValuedFunction.NAME:
                 return new WorkloadSchedPolicyTableValuedFunction(params);
-            case ActiveBeTasksTableValuedFunction.NAME:
-                return new ActiveBeTasksTableValuedFunction(params);
             default:
                 throw new AnalysisException("Could not find table function " + 
funcName);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
index f5244005c16..5b4c15e876e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java
@@ -87,7 +87,7 @@ public class RefreshCatalogTest extends TestWithFeService {
         List<String> dbNames2 = test1.getDbNames();
         Assertions.assertEquals(4, dbNames2.size());
         ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) 
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
-        Assertions.assertEquals(27, infoDb.getTables().size());
+        Assertions.assertEquals(28, infoDb.getTables().size());
         TestExternalDatabase testDb = (TestExternalDatabase) 
test1.getDb("db1").get();
         Assertions.assertEquals(2, testDb.getTables().size());
 
@@ -96,7 +96,7 @@ public class RefreshCatalogTest extends TestWithFeService {
         CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class);
         test1 = mgr2.getCatalog("test1");
         infoDb = (ExternalInfoSchemaDatabase) 
test1.getDb(InfoSchemaDb.DATABASE_NAME).get();
-        Assertions.assertEquals(27, infoDb.getTables().size());
+        Assertions.assertEquals(28, infoDb.getTables().size());
         testDb = (TestExternalDatabase) test1.getDb("db1").get();
         Assertions.assertEquals(2, testDb.getTables().size());
     }
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 5e77729becd..4e038aecb89 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -125,7 +125,8 @@ enum TSchemaTableType {
     SCH_COLUMN_STATISTICS,
     SCH_PARAMETERS,
     SCH_METADATA_NAME_IDS,
-    SCH_PROFILING;
+    SCH_PROFILING,
+    SCH_BACKEND_ACTIVE_TASKS;
 }
 
 enum THdfsCompression {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 04a1fd35163..2cd6af9f050 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -715,8 +715,7 @@ enum TMetadataType {
   JOBS,
   TASKS,
   QUERIES,
-  WORKLOAD_SCHED_POLICY,
-  ACTIVE_BE_TASKS,
+  WORKLOAD_SCHED_POLICY
 }
 
 enum TIcebergQueryType {
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
index 0ad76d0db4c..ff09873c18c 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out
@@ -23,6 +23,7 @@ mariadb_jdbc_catalog
 115    abg
 
 -- !information_schema --
+backend_active_tasks
 character_sets
 collations
 column_privileges
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 3520a11d8bc..309c753f5d6 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -192,6 +192,7 @@ bca 2022-11-02      2022-11-02      8012    vivo
 2      2
 
 -- !information_schema --
+backend_active_tasks
 character_sets
 collations
 column_privileges
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
 
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
index a66c418fede..fcbf4f99244 100644
--- 
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
+++ 
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out
@@ -160,6 +160,7 @@ bca 2022-11-02      2022-11-02      8012    vivo
 123456789012345678901234567890123.12345        
12345678901234567890123456789012.12345  
1234567890123456789012345678901234.12345        
123456789012345678901234567890123.12345 
123456789012345678901234567890123456789012345678901234567890.12345      
123456789012345678901234567890123456789012345678901234567890.12345
 
 -- !information_schema --
+backend_active_tasks
 character_sets
 collations
 column_privileges
diff --git 
a/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy 
b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy
new file mode 100644
index 00000000000..172a7bbf850
--- /dev/null
+++ b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backend_active_tasks") {
+    def thread1 = new Thread({
+        while(true) {
+            // non-pipeline
+            sql "set experimental_enable_pipeline_engine=false"
+            sql "set experimental_enable_pipeline_x_engine=false"
+            sql "select * from information_schema.backend_active_tasks"
+            sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from 
information_schema.backend_active_tasks"
+
+            // pipeline
+            sql "set experimental_enable_pipeline_engine=true"
+            sql "set experimental_enable_pipeline_x_engine=false"
+            sql "select * from information_schema.backend_active_tasks"
+            sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from 
information_schema.backend_active_tasks"
+
+            // pipelinex
+            sql "set experimental_enable_pipeline_engine=true"
+            sql "set experimental_enable_pipeline_x_engine=true"
+            sql "select * from information_schema.backend_active_tasks"
+            sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from 
information_schema.backend_active_tasks"
+            Thread.sleep(1000)
+        }
+    })
+    thread1.setDaemon(true)
+    thread1.start()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to