This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 826cfdaf93 [feature](information_schema) add `backends` 
information_schema table (#13086)
826cfdaf93 is described below

commit 826cfdaf93516d9c60daf6d042e4bfc3adaa1897
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Tue Nov 8 22:15:10 2022 +0800

    [feature](information_schema) add `backends` information_schema table 
(#13086)
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/schema_scan_node.cpp                   |   5 +
 be/src/exec/schema_scanner.cpp                     |  47 +++++-
 be/src/exec/schema_scanner.h                       |  10 ++
 .../schema_scanner/schema_backends_scanner.cpp     | 165 +++++++++++++++++++++
 .../exec/schema_scanner/schema_backends_scanner.h  |  44 ++++++
 be/src/vec/exec/vschema_scan_node.cpp              |  24 ++-
 .../org/apache/doris/analysis/SchemaTableType.java |   3 +-
 .../java/org/apache/doris/catalog/SchemaTable.java |  51 ++++++-
 .../org/apache/doris/planner/SchemaScanNode.java   |   2 +
 .../apache/doris/service/FrontendServiceImpl.java  | 126 ++++++++++++++++
 gensrc/thrift/Data.thrift                          |   9 +-
 gensrc/thrift/Descriptors.thrift                   |   3 +-
 gensrc/thrift/FrontendService.thrift               |  16 ++
 gensrc/thrift/PlanNodes.thrift                     |   8 +
 .../suites/correctness/test_backends_table.groovy  |  23 +++
 16 files changed, 519 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1c7f2be313..2e020e2f92 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -88,6 +88,7 @@ set(EXEC_FILES
     schema_scanner/schema_files_scanner.cpp
     schema_scanner/schema_partitions_scanner.cpp
     schema_scanner/schema_rowsets_scanner.cpp
+    schema_scanner/schema_backends_scanner.cpp
 
     partitioned_hash_table.cc
     partitioned_aggregation_node.cc
diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp
index 6a1b546ec1..e52cddb98f 100644
--- a/be/src/exec/schema_scan_node.cpp
+++ b/be/src/exec/schema_scan_node.cpp
@@ -82,6 +82,11 @@ Status SchemaScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     if (tnode.schema_scan_node.__isset.thread_id) {
         _scanner_param.thread_id = tnode.schema_scan_node.thread_id;
     }
+
+    if (tnode.schema_scan_node.__isset.table_structure) {
+        _scanner_param.table_structure = _pool->add(
+                new 
std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index fb4623114c..a452077f6c 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -17,6 +17,7 @@
 
 #include "exec/schema_scanner.h"
 
+#include "exec/schema_scanner/schema_backends_scanner.h"
 #include "exec/schema_scanner/schema_charsets_scanner.h"
 #include "exec/schema_scanner/schema_collations_scanner.h"
 #include "exec/schema_scanner/schema_columns_scanner.h"
@@ -31,6 +32,8 @@
 #include "exec/schema_scanner/schema_user_privileges_scanner.h"
 #include "exec/schema_scanner/schema_variables_scanner.h"
 #include "exec/schema_scanner/schema_views_scanner.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/string_value.h"
 
 namespace doris {
 
@@ -41,9 +44,23 @@ SchemaScanner::SchemaScanner(ColumnDesc* columns, int 
column_num)
           _param(nullptr),
           _columns(columns),
           _column_num(column_num),
-          _tuple_desc(nullptr) {}
+          _tuple_desc(nullptr),
+          _schema_table_type(TSchemaTableType::SCH_INVALID) {}
 
-SchemaScanner::~SchemaScanner() {}
+SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num, 
TSchemaTableType::type type)
+        : _is_init(false),
+          _param(nullptr),
+          _columns(columns),
+          _column_num(column_num),
+          _tuple_desc(nullptr),
+          _schema_table_type(type) {}
+
+SchemaScanner::~SchemaScanner() {
+    if (_is_create_columns == true && _columns != nullptr) {
+        delete[] _columns;
+        _columns = nullptr;
+    }
+}
 
 Status SchemaScanner::start(RuntimeState* state) {
     if (!_is_init) {
@@ -70,8 +87,15 @@ Status SchemaScanner::init(SchemaScannerParam* param, 
ObjectPool* pool) {
     if (_is_init) {
         return Status::OK();
     }
+    if (nullptr == param || nullptr == pool) {
+        return Status::InternalError("invalid parameter");
+    }
+
+    if (_schema_table_type == TSchemaTableType::SCH_BACKENDS) {
+        RETURN_IF_ERROR(create_columns(param->table_structure, pool));
+    }
 
-    if (nullptr == param || nullptr == pool || nullptr == _columns) {
+    if (nullptr == _columns) {
         return Status::InternalError("invalid parameter");
     }
 
@@ -113,15 +137,30 @@ SchemaScanner* 
SchemaScanner::create(TSchemaTableType::type type) {
         return new (std::nothrow) SchemaPartitionsScanner();
     case TSchemaTableType::SCH_ROWSETS:
         return new (std::nothrow) SchemaRowsetsScanner();
+    case TSchemaTableType::SCH_BACKENDS:
+        return new (std::nothrow) SchemaBackendsScanner();
     default:
         return new (std::nothrow) SchemaDummyScanner();
         break;
     }
 }
 
+Status SchemaScanner::create_columns(const std::vector<TSchemaTableStructure>* 
table_structure,
+                                     ObjectPool* pool) {
+    _column_num = table_structure->size();
+    _columns = new ColumnDesc[_column_num];
+    _is_create_columns = true;
+    for (size_t idx = 0; idx < table_structure->size(); ++idx) {
+        _columns[idx].name = table_structure->at(idx).column_name.c_str();
+        _columns[idx].type = thrift_to_type(table_structure->at(idx).type);
+        _columns[idx].size = table_structure->at(idx).len;
+        _columns[idx].is_null = table_structure->at(idx).is_null;
+    }
+    return Status::OK();
+}
+
 Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
     int null_column = 0;
-
     for (int i = 0; i < _column_num; ++i) {
         if (_columns[i].is_null) {
             null_column++;
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 2450da24d4..33b73b894c 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -43,6 +43,7 @@ struct SchemaScannerParam {
     const std::string* ip;                   // frontend ip
     int32_t port;                            // frontend thrift port
     int64_t thread_id;
+    const std::vector<TSchemaTableStructure>* table_structure;
 
     SchemaScannerParam()
             : db(nullptr),
@@ -68,6 +69,7 @@ public:
         int scale = -1;
     };
     SchemaScanner(ColumnDesc* columns, int column_num);
+    SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type 
type);
     virtual ~SchemaScanner();
 
     // init object need information, schema etc.
@@ -84,6 +86,8 @@ public:
 
 protected:
     Status create_tuple_desc(ObjectPool* pool);
+    Status create_columns(const std::vector<TSchemaTableStructure>* 
table_structure,
+                          ObjectPool* pool);
 
     bool _is_init;
     // this is used for sub class
@@ -94,7 +98,13 @@ protected:
     int _column_num;
     TupleDescriptor* _tuple_desc;
 
+    // _is_create_columns means if ColumnDesc is created from FE.
+    // `_columns` should be deleted if _is_create_columns = true.
+    bool _is_create_columns = false;
+
     static DorisServer* _s_doris_server;
+
+    TSchemaTableType::type _schema_table_type;
 };
 
 } // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.cpp 
b/be/src/exec/schema_scanner/schema_backends_scanner.cpp
new file mode 100644
index 0000000000..ec29eb60a6
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backends_scanner.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_backends_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+
+#include "exec/schema_scanner.h"
+#include "gen_cpp/FrontendService.h"
+#include "runtime/client_cache.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
+#include "runtime/primitive_type.h"
+#include "runtime/string_value.h"
+#include "util/thrift_rpc_helper.h"
+
+namespace doris {
+
+SchemaBackendsScanner::SchemaBackendsScanner()
+        : SchemaScanner(nullptr, 0, TSchemaTableType::SCH_BACKENDS), 
_row_idx(0) {}
+
+Status SchemaBackendsScanner::start(RuntimeState* state) {
+    if (!_is_init) {
+        return Status::InternalError("used before initialized.");
+    }
+    RETURN_IF_ERROR(_fetch_backends_info());
+    RETURN_IF_ERROR(_set_col_name_to_type());
+    return Status::OK();
+}
+
+Status SchemaBackendsScanner::get_next_row(Tuple* tuple, MemPool* pool, bool* 
eos) {
+    if (!_is_init) {
+        return Status::InternalError("Used before initialized.");
+    }
+    if (nullptr == tuple || nullptr == pool || nullptr == eos) {
+        return Status::InternalError("input pointer is nullptr.");
+    }
+    if (_row_idx >= _batch_data.size()) {
+        *eos = true;
+        return Status::OK();
+    }
+    *eos = false;
+    return _fill_one_row(tuple, pool);
+}
+
+Status SchemaBackendsScanner::_fill_one_row(Tuple* tuple, MemPool* pool) {
+    memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
+    for (size_t col_idx = 0; col_idx < _column_num; ++col_idx) {
+        RETURN_IF_ERROR(_fill_one_col(tuple, pool, col_idx));
+    }
+    ++_row_idx;
+    return Status::OK();
+}
+
+Status SchemaBackendsScanner::_fill_one_col(Tuple* tuple, MemPool* pool, 
size_t col_idx) {
+    auto it = _col_name_to_type.find(_columns[col_idx].name);
+
+    // if this column is not exist in BE, we fill it with `NULL`.
+    if (it == _col_name_to_type.end()) {
+        if (_columns[col_idx].is_null) {
+            
tuple->set_null(_tuple_desc->slots()[col_idx]->null_indicator_offset());
+        } else {
+            return Status::InternalError("column {} is not found in BE, and {} 
is not nullable.",
+                                         _columns[col_idx].name, 
_columns[col_idx].name);
+        }
+    } else if (it->second == TYPE_BIGINT) {
+        void* slot = 
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+        *(reinterpret_cast<int64_t*>(slot)) = 
_batch_data[_row_idx].column_value[col_idx].longVal;
+    } else if (it->second == TYPE_INT) {
+        void* slot = 
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+        *(reinterpret_cast<int32_t*>(slot)) = 
_batch_data[_row_idx].column_value[col_idx].intVal;
+    } else if (it->second == TYPE_VARCHAR) {
+        void* slot = 
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+        StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+        str_slot->ptr =
+                
(char*)pool->allocate(_batch_data[_row_idx].column_value[col_idx].stringVal.size());
+        str_slot->len = 
_batch_data[_row_idx].column_value[col_idx].stringVal.size();
+        memcpy(str_slot->ptr, 
_batch_data[_row_idx].column_value[col_idx].stringVal.c_str(),
+               str_slot->len);
+    } else if (it->second == TYPE_DOUBLE) {
+        void* slot = 
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+        *(reinterpret_cast<double_t*>(slot)) =
+                _batch_data[_row_idx].column_value[col_idx].doubleVal;
+    } else {
+        // other type
+    }
+    return Status::OK();
+}
+
+Status SchemaBackendsScanner::_fetch_backends_info() {
+    TFetchSchemaTableDataRequest request;
+    request.cluster_name = "";
+    request.__isset.cluster_name = true;
+    request.schema_table_name = TSchemaTableName::BACKENDS;
+    request.__isset.schema_table_name = true;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    // TODO(ftw): if result will too large?
+    TFetchSchemaTableDataResult result;
+
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->fetchSchemaTableData(result, request);
+            },
+            config::txn_commit_rpc_timeout_ms));
+
+    Status status(result.status);
+    if (!status.ok()) {
+        LOG(WARNING) << "fetch schema table data from master failed, errmsg="
+                     << status.get_error_msg();
+        return status;
+    }
+    _batch_data = std::move(result.data_batch);
+    return Status::OK();
+}
+
+Status SchemaBackendsScanner::_set_col_name_to_type() {
+    _col_name_to_type.emplace("BackendId", TYPE_BIGINT);
+    _col_name_to_type.emplace("TabletNum", TYPE_BIGINT);
+
+    _col_name_to_type.emplace("HeartbeatPort", TYPE_INT);
+    _col_name_to_type.emplace("BePort", TYPE_INT);
+    _col_name_to_type.emplace("HttpPort", TYPE_INT);
+    _col_name_to_type.emplace("BrpcPort", TYPE_INT);
+
+    _col_name_to_type.emplace("Cluster", TYPE_VARCHAR);
+    _col_name_to_type.emplace("IP", TYPE_VARCHAR);
+    _col_name_to_type.emplace("LastStartTime", TYPE_VARCHAR);
+    _col_name_to_type.emplace("LastHeartbeat", TYPE_VARCHAR);
+    _col_name_to_type.emplace("Alive", TYPE_VARCHAR);
+    _col_name_to_type.emplace("SystemDecommissioned", TYPE_VARCHAR);
+    _col_name_to_type.emplace("ClusterDecommissioned", TYPE_VARCHAR);
+
+    _col_name_to_type.emplace("DataUsedCapacity", TYPE_BIGINT);
+    _col_name_to_type.emplace("AvailCapacity", TYPE_BIGINT);
+    _col_name_to_type.emplace("TotalCapacity", TYPE_BIGINT);
+
+    _col_name_to_type.emplace("UsedPct", TYPE_DOUBLE);
+    _col_name_to_type.emplace("MaxDiskUsedPct", TYPE_DOUBLE);
+
+    _col_name_to_type.emplace("RemoteUsedCapacity", TYPE_BIGINT);
+
+    _col_name_to_type.emplace("Tag", TYPE_VARCHAR);
+    _col_name_to_type.emplace("ErrMsg", TYPE_VARCHAR);
+    _col_name_to_type.emplace("Version", TYPE_VARCHAR);
+    _col_name_to_type.emplace("Status", TYPE_VARCHAR);
+    return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.h 
b/be/src/exec/schema_scanner/schema_backends_scanner.h
new file mode 100644
index 0000000000..32753f568e
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backends_scanner.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/schema_scanner.h"
+namespace doris {
+
+class SchemaBackendsScanner : public SchemaScanner {
+public:
+    SchemaBackendsScanner();
+    ~SchemaBackendsScanner() override = default;
+
+    Status start(RuntimeState* state) override;
+    Status get_next_row(Tuple* tuple, MemPool* pool, bool* eos) override;
+
+private:
+    Status _fill_one_row(Tuple* tuple, MemPool* pool);
+    Status _fetch_backends_info();
+    Status _fill_one_col(Tuple* tuple, MemPool* pool, size_t idx);
+    Status _set_col_name_to_type();
+
+private:
+    // column_name -> type, set by _set_col_name_to_type()
+    std::unordered_map<std::string, PrimitiveType> _col_name_to_type;
+
+    std::vector<TRow> _batch_data;
+    size_t _row_idx;
+};
+} // namespace doris
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index dfb6811cbf..0f2710b665 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -50,10 +50,10 @@ VSchemaScanNode::~VSchemaScanNode() {
     _src_tuple = nullptr;
 
     delete[] reinterpret_cast<char*>(_src_single_tuple);
-    _src_single_tuple = NULL;
+    _src_single_tuple = nullptr;
 
     delete[] reinterpret_cast<char*>(_dest_single_tuple);
-    _dest_single_tuple = NULL;
+    _dest_single_tuple = nullptr;
 }
 
 Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -92,6 +92,11 @@ Status VSchemaScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     if (tnode.schema_scan_node.__isset.thread_id) {
         _scanner_param.thread_id = tnode.schema_scan_node.thread_id;
     }
+
+    if (tnode.schema_scan_node.__isset.table_structure) {
+        _scanner_param.table_structure = _pool->add(
+                new 
std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure));
+    }
     return Status::OK();
 }
 
@@ -138,7 +143,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
     // new one mem pool
     _tuple_pool.reset(new (std::nothrow) MemPool());
 
-    if (nullptr == _tuple_pool.get()) {
+    if (nullptr == _tuple_pool) {
         return Status::InternalError("Allocate MemPool failed.");
     }
 
@@ -161,7 +166,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
     // new one scanner
     
_schema_scanner.reset(SchemaScanner::create(schema_table->schema_table_type()));
 
-    if (nullptr == _schema_scanner.get()) {
+    if (nullptr == _schema_scanner) {
         return Status::InternalError("schema scanner get nullptr pointer.");
     }
 
@@ -221,13 +226,13 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
 
     _src_single_tuple =
             reinterpret_cast<doris::Tuple*>(new (std::nothrow) 
char[_src_tuple_desc->byte_size()]);
-    if (NULL == _src_single_tuple) {
+    if (nullptr == _src_single_tuple) {
         return Status::InternalError("new src single tuple failed.");
     }
 
     _dest_single_tuple =
             reinterpret_cast<doris::Tuple*>(new (std::nothrow) 
char[_dest_tuple_desc->byte_size()]);
-    if (NULL == _dest_single_tuple) {
+    if (nullptr == _dest_single_tuple) {
         return Status::InternalError("new desc single tuple failed.");
     }
 
@@ -239,9 +244,12 @@ Status VSchemaScanNode::get_next(RuntimeState* state, 
vectorized::Block* block,
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     VLOG_CRITICAL << "VSchemaScanNode::GetNext";
-    if (state == NULL || block == NULL || eos == NULL)
+    if (state == nullptr || block == nullptr || eos == nullptr) {
         return Status::InternalError("input is NULL pointer");
-    if (!_is_init) return Status::InternalError("used before initialize.");
+    }
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
     RETURN_IF_CANCELLED(state);
     std::vector<vectorized::MutableColumnPtr> columns(_slot_num);
     bool schema_eos = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 43d771953f..c8df72b7de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -66,7 +66,8 @@ public enum SchemaTableType {
     SCH_VIEWS("VIEWS", "VIEWS", TSchemaTableType.SCH_VIEWS),
     SCH_CREATE_TABLE("CREATE_TABLE", "CREATE_TABLE", 
TSchemaTableType.SCH_CREATE_TABLE),
     SCH_INVALID("NULL", "NULL", TSchemaTableType.SCH_INVALID),
-    SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS);
+    SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS),
+    SCH_BACKENDS("BACKENDS", "BACKENDS", TSchemaTableType.SCH_BACKENDS);
     private static final String dbName = "INFORMATION_SCHEMA";
     private static SelectList fullSelectLists;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 6ebc85f02c..7e06a00f33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.analysis.SchemaTableType;
 import org.apache.doris.common.SystemIdGenerator;
 import org.apache.doris.thrift.TSchemaTable;
+import org.apache.doris.thrift.TSchemaTableStructure;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
@@ -399,8 +400,54 @@ public class SchemaTable extends Table {
                                     .column("CREATION_TIME", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("OLDEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("NEWEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
-                                    .build())).build();
-    private SchemaTableType schemaTableType;
+                                    .build()))
+            .put("backends", new SchemaTable(SystemIdGenerator.getNextId(), 
"backends", TableType.SCHEMA,
+                    builder().column("BackendId", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("Cluster", ScalarType.createVarchar(64))
+                            .column("IP", ScalarType.createVarchar(16))
+                            .column("HeartbeatPort", 
ScalarType.createType(PrimitiveType.INT))
+                            .column("BePort", 
ScalarType.createType(PrimitiveType.INT))
+                            .column("HttpPort", 
ScalarType.createType(PrimitiveType.INT))
+                            .column("BrpcPort", 
ScalarType.createType(PrimitiveType.INT))
+                            .column("LastStartTime", 
ScalarType.createVarchar(32))
+                            .column("LastHeartbeat", 
ScalarType.createVarchar(32))
+                            .column("Alive", ScalarType.createVarchar(8))
+                            .column("SystemDecommissioned", 
ScalarType.createVarchar(8))
+                            .column("ClusterDecommissioned", 
ScalarType.createVarchar(8))
+                            .column("TabletNum", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("DataUsedCapacity", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("AvailCapacity", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("TotalCapacity", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("UsedPct", 
ScalarType.createType(PrimitiveType.DOUBLE))
+                            .column("MaxDiskUsedPct", 
ScalarType.createType(PrimitiveType.DOUBLE))
+                            .column("RemoteUsedCapacity", 
ScalarType.createType(PrimitiveType.BIGINT))
+                            .column("Tag", ScalarType.createVarchar(128))
+                            .column("ErrMsg", ScalarType.createVarchar(2048))
+                            .column("Version", ScalarType.createVarchar(64))
+                            .column("Status", ScalarType.createVarchar(1024))
+                            .build()))
+            .build();
+
+    public static List<TSchemaTableStructure> getTableStructure(String 
tableName) {
+        List<TSchemaTableStructure> tSchemaTableStructureList = 
Lists.newArrayList();
+        switch (tableName) {
+            case "backends": {
+                Table table = TABLE_MAP.get(tableName);
+                for (Column column : table.getFullSchema()) {
+                    TSchemaTableStructure tSchemaTableStructure = new 
TSchemaTableStructure();
+                    tSchemaTableStructure.setColumnName(column.getName());
+                    
tSchemaTableStructure.setType(column.getDataType().toThrift());
+                    
tSchemaTableStructure.setLen(column.getDataType().getSlotSize());
+                    tSchemaTableStructure.setIsNull(column.isAllowNull());
+                    tSchemaTableStructureList.add(tSchemaTableStructure);
+                }
+                break;
+            }
+            default:
+                break;
+        }
+        return tSchemaTableStructureList;
+    }
 
     protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema) {
         super(id, name, type, baseSchema);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index db70e0c623..4b5f5001b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -110,6 +110,8 @@ public class SchemaScanNode extends ScanNode {
 
         TUserIdentity tCurrentUser = 
ConnectContext.get().getCurrentUserIdentity().toThrift();
         msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
+
+        
msg.schema_scan_node.setTableStructure(SchemaTable.getTableStructure(tableName));
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 53ee7dd637..57a7cb58d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.service;
 
+import org.apache.doris.alter.DecommissionType;
 import org.apache.doris.analysis.SetType;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
@@ -30,6 +31,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.cluster.Cluster;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
@@ -43,6 +45,7 @@ import org.apache.doris.common.ThriftServerContext;
 import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
@@ -57,11 +60,13 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.FrontendServiceVersion;
+import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TColumnDef;
 import org.apache.doris.thrift.TColumnDesc;
 import org.apache.doris.thrift.TDescribeTableParams;
@@ -69,6 +74,8 @@ import org.apache.doris.thrift.TDescribeTableResult;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFeResult;
 import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
+import org.apache.doris.thrift.TFetchSchemaTableDataResult;
 import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TFrontendPingFrontendRequest;
 import org.apache.doris.thrift.TFrontendPingFrontendResult;
@@ -99,6 +106,7 @@ import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TReportRequest;
+import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TS3StorageParam;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
@@ -119,9 +127,11 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -977,6 +987,122 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    @Override
+    public TFetchSchemaTableDataResult 
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
+        switch (request.getSchemaTableName()) {
+            case BACKENDS:
+                return getBackendsSchemaTable(request);
+            default:
+                break;
+        }
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+        return result;
+    }
+
+    private TFetchSchemaTableDataResult 
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
+        final SystemInfoService clusterInfoService = 
Env.getCurrentSystemInfo();
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        List<Long> backendIds = null;
+        if (!Strings.isNullOrEmpty(request.cluster_name)) {
+            final Cluster cluster = 
Env.getCurrentEnv().getCluster(request.cluster_name);
+            // root not in any cluster
+            if (null == cluster) {
+                return result;
+            }
+            backendIds = cluster.getBackendIdList();
+        } else {
+            backendIds = clusterInfoService.getBackendIds(false);
+            if (backendIds == null) {
+                return result;
+            }
+        }
+
+        long start = System.currentTimeMillis();
+        Stopwatch watch = Stopwatch.createUnstarted();
+
+        List<TRow> dataBatch = Lists.newArrayList();
+        for (long backendId : backendIds) {
+            Backend backend = clusterInfoService.getBackend(backendId);
+            if (backend == null) {
+                continue;
+            }
+
+            watch.start();
+            Integer tabletNum = 
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
+            watch.stop();
+
+            TRow trow = new TRow();
+            trow.addToColumnValue(new TCell().setLongVal(backendId));
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getOwnerClusterName()));
+            trow.addToColumnValue(new TCell().setStringVal(backend.getHost()));
+            if (Strings.isNullOrEmpty(request.cluster_name)) {
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHeartbeatPort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBePort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHttpPort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBrpcPort()));
+            }
+            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
+            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
+            trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(backend.isAlive())));
+            if (backend.isDecommissioned() && backend.getDecommissionType() == 
DecommissionType.ClusterDecommission) {
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+                trow.addToColumnValue(new TCell().setStringVal("true"));
+            } else if (backend.isDecommissioned()
+                    && backend.getDecommissionType() == 
DecommissionType.SystemDecommission) {
+                trow.addToColumnValue(new TCell().setStringVal("true"));
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+            } else {
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+            }
+            trow.addToColumnValue(new TCell().setLongVal(tabletNum));
+
+            // capacity
+            // data used
+            trow.addToColumnValue(new 
TCell().setLongVal(backend.getDataUsedCapacityB()));
+
+            // available
+            long availB = backend.getAvailableCapacityB();
+            trow.addToColumnValue(new TCell().setLongVal(availB));
+
+            // total
+            long totalB = backend.getTotalCapacityB();
+            trow.addToColumnValue(new TCell().setLongVal(totalB));
+
+            // used percent
+            double used = 0.0;
+            if (totalB <= 0) {
+                used = 0.0;
+            } else {
+                used = (double) (totalB - availB) * 100 / totalB;
+            }
+            trow.addToColumnValue(new TCell().setDoubleVal(used));
+            trow.addToColumnValue(new 
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
+
+            // remote used capacity
+            trow.addToColumnValue(new 
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
+
+            // tags
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getTagMapString()));
+            // err msg
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getHeartbeatErrMsg()));
+            // version
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getVersion()));
+            // status
+            trow.addToColumnValue(new TCell().setStringVal(new 
Gson().toJson(backend.getBackendStatus())));
+            dataBatch.add(trow);
+        }
+
+        // backends proc node get result too slow, add log to observer.
+        LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
+                watch.elapsed(TimeUnit.MILLISECONDS), 
(System.currentTimeMillis() - start));
+
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
     private TNetworkAddress getClientAddr() {
         ThriftServerContext connectionContext = 
ThriftServerEventProcessor.getConnectionContext();
         // For NonBlockingServer, we can not get client ip.
diff --git a/gensrc/thrift/Data.thrift b/gensrc/thrift/Data.thrift
index 676e22a0f2..fee713694f 100644
--- a/gensrc/thrift/Data.thrift
+++ b/gensrc/thrift/Data.thrift
@@ -47,17 +47,22 @@ struct TRowBatch {
 }
 
 // this is a union over all possible return types
-struct TColumnValue {
+struct TCell {
   // TODO: use <type>_val instead of camelcase
   1: optional bool boolVal
   2: optional i32 intVal
   3: optional i64 longVal
   4: optional double doubleVal
   5: optional string stringVal
+  // add type: date datetime
 }
 
 struct TResultRow {
-  1: list<TColumnValue> colVals
+  1: list<TCell> colVals
+}
+
+struct TRow {
+  1: optional list<TCell> column_value
 }
 
 // Serialized, self-contained version of a RowBatch (in 
be/src/runtime/row-batch.h).
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index f9517e2bfa..a3fc84571b 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -103,7 +103,8 @@ enum TSchemaTableType {
     SCH_VARIABLES,
     SCH_VIEWS,
     SCH_INVALID,
-    SCH_ROWSETS
+    SCH_ROWSETS,
+    SCH_BACKENDS
 }
 
 enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 5d48c1ae7e..9cf6ffb6ee 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -689,6 +689,20 @@ struct TInitExternalCtlMetaResult {
     2: optional string status;
 }
 
+enum TSchemaTableName{
+  BACKENDS = 0,
+}
+
+struct TFetchSchemaTableDataRequest {
+  1: optional string cluster_name
+  2: optional TSchemaTableName schema_table_name
+}
+
+struct TFetchSchemaTableDataResult {
+  1: required Status.TStatus status
+  2: optional list<Data.TRow> data_batch;
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -725,4 +739,6 @@ service FrontendService {
 
     AgentService.TGetStoragePolicyResult refreshStoragePolicy()
     TInitExternalCtlMetaResult initExternalCtlMeta(1: 
TInitExternalCtlMetaRequest request)
+
+    TFetchSchemaTableDataResult fetchSchemaTableData(1: 
TFetchSchemaTableDataRequest request)
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2df3fea6a2..f8566c2b5d 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -453,6 +453,13 @@ struct TCsvScanNode {
   10:optional map<string, TMiniLoadEtlFunction> column_function_mapping
 }
 
+struct TSchemaTableStructure {
+  1: optional string column_name
+  2: optional Types.TPrimitiveType type
+  3: optional i64 len
+  4: optional bool is_null;
+}
+
 struct TSchemaScanNode {
   1: required Types.TTupleId tuple_id
 
@@ -467,6 +474,7 @@ struct TSchemaScanNode {
   10: optional string user_ip   // deprecated
   11: optional Types.TUserIdentity current_user_ident   // to replace the user 
and user_ip
   12: optional bool show_hidden_cloumns = false
+  13: optional list<TSchemaTableStructure> table_structure
 }
 
 struct TMetaScanNode {
diff --git a/regression-test/suites/correctness/test_backends_table.groovy 
b/regression-test/suites/correctness/test_backends_table.groovy
new file mode 100644
index 0000000000..3b4771d825
--- /dev/null
+++ b/regression-test/suites/correctness/test_backends_table.groovy
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This suit test the `backends` information_schema table
+suite("test_backends_table") {
+    List<List<Object>> table =  sql """ select * from 
information_schema.backends; """
+    assertTrue(table.size() > 0) // row should > 0
+    assertTrue(table[0].size == 23) // column should be 23
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to