This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 826cfdaf93 [feature](information_schema) add `backends` information_schema table (#13086) 826cfdaf93 is described below commit 826cfdaf93516d9c60daf6d042e4bfc3adaa1897 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Nov 8 22:15:10 2022 +0800 [feature](information_schema) add `backends` information_schema table (#13086) --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/schema_scan_node.cpp | 5 + be/src/exec/schema_scanner.cpp | 47 +++++- be/src/exec/schema_scanner.h | 10 ++ .../schema_scanner/schema_backends_scanner.cpp | 165 +++++++++++++++++++++ .../exec/schema_scanner/schema_backends_scanner.h | 44 ++++++ be/src/vec/exec/vschema_scan_node.cpp | 24 ++- .../org/apache/doris/analysis/SchemaTableType.java | 3 +- .../java/org/apache/doris/catalog/SchemaTable.java | 51 ++++++- .../org/apache/doris/planner/SchemaScanNode.java | 2 + .../apache/doris/service/FrontendServiceImpl.java | 126 ++++++++++++++++ gensrc/thrift/Data.thrift | 9 +- gensrc/thrift/Descriptors.thrift | 3 +- gensrc/thrift/FrontendService.thrift | 16 ++ gensrc/thrift/PlanNodes.thrift | 8 + .../suites/correctness/test_backends_table.groovy | 23 +++ 16 files changed, 519 insertions(+), 18 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 1c7f2be313..2e020e2f92 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -88,6 +88,7 @@ set(EXEC_FILES schema_scanner/schema_files_scanner.cpp schema_scanner/schema_partitions_scanner.cpp schema_scanner/schema_rowsets_scanner.cpp + schema_scanner/schema_backends_scanner.cpp partitioned_hash_table.cc partitioned_aggregation_node.cc diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 6a1b546ec1..e52cddb98f 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -82,6 +82,11 @@ Status SchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.schema_scan_node.__isset.thread_id) { _scanner_param.thread_id = tnode.schema_scan_node.thread_id; } + + if (tnode.schema_scan_node.__isset.table_structure) { + _scanner_param.table_structure = _pool->add( + new std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure)); + } return Status::OK(); } diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index fb4623114c..a452077f6c 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -17,6 +17,7 @@ #include "exec/schema_scanner.h" +#include "exec/schema_scanner/schema_backends_scanner.h" #include "exec/schema_scanner/schema_charsets_scanner.h" #include "exec/schema_scanner/schema_collations_scanner.h" #include "exec/schema_scanner/schema_columns_scanner.h" @@ -31,6 +32,8 @@ #include "exec/schema_scanner/schema_user_privileges_scanner.h" #include "exec/schema_scanner/schema_variables_scanner.h" #include "exec/schema_scanner/schema_views_scanner.h" +#include "runtime/define_primitive_type.h" +#include "runtime/string_value.h" namespace doris { @@ -41,9 +44,23 @@ SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num) _param(nullptr), _columns(columns), _column_num(column_num), - _tuple_desc(nullptr) {} + _tuple_desc(nullptr), + _schema_table_type(TSchemaTableType::SCH_INVALID) {} -SchemaScanner::~SchemaScanner() {} +SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type type) + : _is_init(false), + _param(nullptr), + _columns(columns), + _column_num(column_num), + _tuple_desc(nullptr), + _schema_table_type(type) {} + +SchemaScanner::~SchemaScanner() { + if (_is_create_columns == true && _columns != nullptr) { + delete[] _columns; + _columns = nullptr; + } +} Status SchemaScanner::start(RuntimeState* state) { if (!_is_init) { @@ -70,8 +87,15 @@ Status SchemaScanner::init(SchemaScannerParam* param, ObjectPool* pool) { if (_is_init) { return Status::OK(); } + if (nullptr == param || nullptr == pool) { + return Status::InternalError("invalid parameter"); + } + + if (_schema_table_type == TSchemaTableType::SCH_BACKENDS) { + RETURN_IF_ERROR(create_columns(param->table_structure, pool)); + } - if (nullptr == param || nullptr == pool || nullptr == _columns) { + if (nullptr == _columns) { return Status::InternalError("invalid parameter"); } @@ -113,15 +137,30 @@ SchemaScanner* SchemaScanner::create(TSchemaTableType::type type) { return new (std::nothrow) SchemaPartitionsScanner(); case TSchemaTableType::SCH_ROWSETS: return new (std::nothrow) SchemaRowsetsScanner(); + case TSchemaTableType::SCH_BACKENDS: + return new (std::nothrow) SchemaBackendsScanner(); default: return new (std::nothrow) SchemaDummyScanner(); break; } } +Status SchemaScanner::create_columns(const std::vector<TSchemaTableStructure>* table_structure, + ObjectPool* pool) { + _column_num = table_structure->size(); + _columns = new ColumnDesc[_column_num]; + _is_create_columns = true; + for (size_t idx = 0; idx < table_structure->size(); ++idx) { + _columns[idx].name = table_structure->at(idx).column_name.c_str(); + _columns[idx].type = thrift_to_type(table_structure->at(idx).type); + _columns[idx].size = table_structure->at(idx).len; + _columns[idx].is_null = table_structure->at(idx).is_null; + } + return Status::OK(); +} + Status SchemaScanner::create_tuple_desc(ObjectPool* pool) { int null_column = 0; - for (int i = 0; i < _column_num; ++i) { if (_columns[i].is_null) { null_column++; diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index 2450da24d4..33b73b894c 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -43,6 +43,7 @@ struct SchemaScannerParam { const std::string* ip; // frontend ip int32_t port; // frontend thrift port int64_t thread_id; + const std::vector<TSchemaTableStructure>* table_structure; SchemaScannerParam() : db(nullptr), @@ -68,6 +69,7 @@ public: int scale = -1; }; SchemaScanner(ColumnDesc* columns, int column_num); + SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type type); virtual ~SchemaScanner(); // init object need information, schema etc. @@ -84,6 +86,8 @@ public: protected: Status create_tuple_desc(ObjectPool* pool); + Status create_columns(const std::vector<TSchemaTableStructure>* table_structure, + ObjectPool* pool); bool _is_init; // this is used for sub class @@ -94,7 +98,13 @@ protected: int _column_num; TupleDescriptor* _tuple_desc; + // _is_create_columns means if ColumnDesc is created from FE. + // `_columns` should be deleted if _is_create_columns = true. + bool _is_create_columns = false; + static DorisServer* _s_doris_server; + + TSchemaTableType::type _schema_table_type; }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.cpp b/be/src/exec/schema_scanner/schema_backends_scanner.cpp new file mode 100644 index 0000000000..ec29eb60a6 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backends_scanner.cpp @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_backends_scanner.h" + +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/HeartbeatService_types.h> + +#include "exec/schema_scanner.h" +#include "gen_cpp/FrontendService.h" +#include "runtime/client_cache.h" +#include "runtime/define_primitive_type.h" +#include "runtime/exec_env.h" +#include "runtime/primitive_type.h" +#include "runtime/string_value.h" +#include "util/thrift_rpc_helper.h" + +namespace doris { + +SchemaBackendsScanner::SchemaBackendsScanner() + : SchemaScanner(nullptr, 0, TSchemaTableType::SCH_BACKENDS), _row_idx(0) {} + +Status SchemaBackendsScanner::start(RuntimeState* state) { + if (!_is_init) { + return Status::InternalError("used before initialized."); + } + RETURN_IF_ERROR(_fetch_backends_info()); + RETURN_IF_ERROR(_set_col_name_to_type()); + return Status::OK(); +} + +Status SchemaBackendsScanner::get_next_row(Tuple* tuple, MemPool* pool, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + if (nullptr == tuple || nullptr == pool || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + if (_row_idx >= _batch_data.size()) { + *eos = true; + return Status::OK(); + } + *eos = false; + return _fill_one_row(tuple, pool); +} + +Status SchemaBackendsScanner::_fill_one_row(Tuple* tuple, MemPool* pool) { + memset((void*)tuple, 0, _tuple_desc->num_null_bytes()); + for (size_t col_idx = 0; col_idx < _column_num; ++col_idx) { + RETURN_IF_ERROR(_fill_one_col(tuple, pool, col_idx)); + } + ++_row_idx; + return Status::OK(); +} + +Status SchemaBackendsScanner::_fill_one_col(Tuple* tuple, MemPool* pool, size_t col_idx) { + auto it = _col_name_to_type.find(_columns[col_idx].name); + + // if this column is not exist in BE, we fill it with `NULL`. + if (it == _col_name_to_type.end()) { + if (_columns[col_idx].is_null) { + tuple->set_null(_tuple_desc->slots()[col_idx]->null_indicator_offset()); + } else { + return Status::InternalError("column {} is not found in BE, and {} is not nullable.", + _columns[col_idx].name, _columns[col_idx].name); + } + } else if (it->second == TYPE_BIGINT) { + void* slot = tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset()); + *(reinterpret_cast<int64_t*>(slot)) = _batch_data[_row_idx].column_value[col_idx].longVal; + } else if (it->second == TYPE_INT) { + void* slot = tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset()); + *(reinterpret_cast<int32_t*>(slot)) = _batch_data[_row_idx].column_value[col_idx].intVal; + } else if (it->second == TYPE_VARCHAR) { + void* slot = tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = + (char*)pool->allocate(_batch_data[_row_idx].column_value[col_idx].stringVal.size()); + str_slot->len = _batch_data[_row_idx].column_value[col_idx].stringVal.size(); + memcpy(str_slot->ptr, _batch_data[_row_idx].column_value[col_idx].stringVal.c_str(), + str_slot->len); + } else if (it->second == TYPE_DOUBLE) { + void* slot = tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset()); + *(reinterpret_cast<double_t*>(slot)) = + _batch_data[_row_idx].column_value[col_idx].doubleVal; + } else { + // other type + } + return Status::OK(); +} + +Status SchemaBackendsScanner::_fetch_backends_info() { + TFetchSchemaTableDataRequest request; + request.cluster_name = ""; + request.__isset.cluster_name = true; + request.schema_table_name = TSchemaTableName::BACKENDS; + request.__isset.schema_table_name = true; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + // TODO(ftw): if result will too large? + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + config::txn_commit_rpc_timeout_ms)); + + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "fetch schema table data from master failed, errmsg=" + << status.get_error_msg(); + return status; + } + _batch_data = std::move(result.data_batch); + return Status::OK(); +} + +Status SchemaBackendsScanner::_set_col_name_to_type() { + _col_name_to_type.emplace("BackendId", TYPE_BIGINT); + _col_name_to_type.emplace("TabletNum", TYPE_BIGINT); + + _col_name_to_type.emplace("HeartbeatPort", TYPE_INT); + _col_name_to_type.emplace("BePort", TYPE_INT); + _col_name_to_type.emplace("HttpPort", TYPE_INT); + _col_name_to_type.emplace("BrpcPort", TYPE_INT); + + _col_name_to_type.emplace("Cluster", TYPE_VARCHAR); + _col_name_to_type.emplace("IP", TYPE_VARCHAR); + _col_name_to_type.emplace("LastStartTime", TYPE_VARCHAR); + _col_name_to_type.emplace("LastHeartbeat", TYPE_VARCHAR); + _col_name_to_type.emplace("Alive", TYPE_VARCHAR); + _col_name_to_type.emplace("SystemDecommissioned", TYPE_VARCHAR); + _col_name_to_type.emplace("ClusterDecommissioned", TYPE_VARCHAR); + + _col_name_to_type.emplace("DataUsedCapacity", TYPE_BIGINT); + _col_name_to_type.emplace("AvailCapacity", TYPE_BIGINT); + _col_name_to_type.emplace("TotalCapacity", TYPE_BIGINT); + + _col_name_to_type.emplace("UsedPct", TYPE_DOUBLE); + _col_name_to_type.emplace("MaxDiskUsedPct", TYPE_DOUBLE); + + _col_name_to_type.emplace("RemoteUsedCapacity", TYPE_BIGINT); + + _col_name_to_type.emplace("Tag", TYPE_VARCHAR); + _col_name_to_type.emplace("ErrMsg", TYPE_VARCHAR); + _col_name_to_type.emplace("Version", TYPE_VARCHAR); + _col_name_to_type.emplace("Status", TYPE_VARCHAR); + return Status::OK(); +} +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.h b/be/src/exec/schema_scanner/schema_backends_scanner.h new file mode 100644 index 0000000000..32753f568e --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backends_scanner.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/schema_scanner.h" +namespace doris { + +class SchemaBackendsScanner : public SchemaScanner { +public: + SchemaBackendsScanner(); + ~SchemaBackendsScanner() override = default; + + Status start(RuntimeState* state) override; + Status get_next_row(Tuple* tuple, MemPool* pool, bool* eos) override; + +private: + Status _fill_one_row(Tuple* tuple, MemPool* pool); + Status _fetch_backends_info(); + Status _fill_one_col(Tuple* tuple, MemPool* pool, size_t idx); + Status _set_col_name_to_type(); + +private: + // column_name -> type, set by _set_col_name_to_type() + std::unordered_map<std::string, PrimitiveType> _col_name_to_type; + + std::vector<TRow> _batch_data; + size_t _row_idx; +}; +} // namespace doris diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index dfb6811cbf..0f2710b665 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -50,10 +50,10 @@ VSchemaScanNode::~VSchemaScanNode() { _src_tuple = nullptr; delete[] reinterpret_cast<char*>(_src_single_tuple); - _src_single_tuple = NULL; + _src_single_tuple = nullptr; delete[] reinterpret_cast<char*>(_dest_single_tuple); - _dest_single_tuple = NULL; + _dest_single_tuple = nullptr; } Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { @@ -92,6 +92,11 @@ Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.schema_scan_node.__isset.thread_id) { _scanner_param.thread_id = tnode.schema_scan_node.thread_id; } + + if (tnode.schema_scan_node.__isset.table_structure) { + _scanner_param.table_structure = _pool->add( + new std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure)); + } return Status::OK(); } @@ -138,7 +143,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { // new one mem pool _tuple_pool.reset(new (std::nothrow) MemPool()); - if (nullptr == _tuple_pool.get()) { + if (nullptr == _tuple_pool) { return Status::InternalError("Allocate MemPool failed."); } @@ -161,7 +166,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { // new one scanner _schema_scanner.reset(SchemaScanner::create(schema_table->schema_table_type())); - if (nullptr == _schema_scanner.get()) { + if (nullptr == _schema_scanner) { return Status::InternalError("schema scanner get nullptr pointer."); } @@ -221,13 +226,13 @@ Status VSchemaScanNode::prepare(RuntimeState* state) { _src_single_tuple = reinterpret_cast<doris::Tuple*>(new (std::nothrow) char[_src_tuple_desc->byte_size()]); - if (NULL == _src_single_tuple) { + if (nullptr == _src_single_tuple) { return Status::InternalError("new src single tuple failed."); } _dest_single_tuple = reinterpret_cast<doris::Tuple*>(new (std::nothrow) char[_dest_tuple_desc->byte_size()]); - if (NULL == _dest_single_tuple) { + if (nullptr == _dest_single_tuple) { return Status::InternalError("new desc single tuple failed."); } @@ -239,9 +244,12 @@ Status VSchemaScanNode::get_next(RuntimeState* state, vectorized::Block* block, SCOPED_TIMER(_runtime_profile->total_time_counter()); VLOG_CRITICAL << "VSchemaScanNode::GetNext"; - if (state == NULL || block == NULL || eos == NULL) + if (state == nullptr || block == nullptr || eos == nullptr) { return Status::InternalError("input is NULL pointer"); - if (!_is_init) return Status::InternalError("used before initialize."); + } + if (!_is_init) { + return Status::InternalError("used before initialize."); + } RETURN_IF_CANCELLED(state); std::vector<vectorized::MutableColumnPtr> columns(_slot_num); bool schema_eos = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 43d771953f..c8df72b7de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -66,7 +66,8 @@ public enum SchemaTableType { SCH_VIEWS("VIEWS", "VIEWS", TSchemaTableType.SCH_VIEWS), SCH_CREATE_TABLE("CREATE_TABLE", "CREATE_TABLE", TSchemaTableType.SCH_CREATE_TABLE), SCH_INVALID("NULL", "NULL", TSchemaTableType.SCH_INVALID), - SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS); + SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS), + SCH_BACKENDS("BACKENDS", "BACKENDS", TSchemaTableType.SCH_BACKENDS); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 6ebc85f02c..7e06a00f33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.SchemaTableType; import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.thrift.TSchemaTable; +import org.apache.doris.thrift.TSchemaTableStructure; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -399,8 +400,54 @@ public class SchemaTable extends Table { .column("CREATION_TIME", ScalarType.createType(PrimitiveType.BIGINT)) .column("OLDEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT)) .column("NEWEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT)) - .build())).build(); - private SchemaTableType schemaTableType; + .build())) + .put("backends", new SchemaTable(SystemIdGenerator.getNextId(), "backends", TableType.SCHEMA, + builder().column("BackendId", ScalarType.createType(PrimitiveType.BIGINT)) + .column("Cluster", ScalarType.createVarchar(64)) + .column("IP", ScalarType.createVarchar(16)) + .column("HeartbeatPort", ScalarType.createType(PrimitiveType.INT)) + .column("BePort", ScalarType.createType(PrimitiveType.INT)) + .column("HttpPort", ScalarType.createType(PrimitiveType.INT)) + .column("BrpcPort", ScalarType.createType(PrimitiveType.INT)) + .column("LastStartTime", ScalarType.createVarchar(32)) + .column("LastHeartbeat", ScalarType.createVarchar(32)) + .column("Alive", ScalarType.createVarchar(8)) + .column("SystemDecommissioned", ScalarType.createVarchar(8)) + .column("ClusterDecommissioned", ScalarType.createVarchar(8)) + .column("TabletNum", ScalarType.createType(PrimitiveType.BIGINT)) + .column("DataUsedCapacity", ScalarType.createType(PrimitiveType.BIGINT)) + .column("AvailCapacity", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TotalCapacity", ScalarType.createType(PrimitiveType.BIGINT)) + .column("UsedPct", ScalarType.createType(PrimitiveType.DOUBLE)) + .column("MaxDiskUsedPct", ScalarType.createType(PrimitiveType.DOUBLE)) + .column("RemoteUsedCapacity", ScalarType.createType(PrimitiveType.BIGINT)) + .column("Tag", ScalarType.createVarchar(128)) + .column("ErrMsg", ScalarType.createVarchar(2048)) + .column("Version", ScalarType.createVarchar(64)) + .column("Status", ScalarType.createVarchar(1024)) + .build())) + .build(); + + public static List<TSchemaTableStructure> getTableStructure(String tableName) { + List<TSchemaTableStructure> tSchemaTableStructureList = Lists.newArrayList(); + switch (tableName) { + case "backends": { + Table table = TABLE_MAP.get(tableName); + for (Column column : table.getFullSchema()) { + TSchemaTableStructure tSchemaTableStructure = new TSchemaTableStructure(); + tSchemaTableStructure.setColumnName(column.getName()); + tSchemaTableStructure.setType(column.getDataType().toThrift()); + tSchemaTableStructure.setLen(column.getDataType().getSlotSize()); + tSchemaTableStructure.setIsNull(column.isAllowNull()); + tSchemaTableStructureList.add(tSchemaTableStructure); + } + break; + } + default: + break; + } + return tSchemaTableStructureList; + } protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) { super(id, name, type, baseSchema); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java index db70e0c623..4b5f5001b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java @@ -110,6 +110,8 @@ public class SchemaScanNode extends ScanNode { TUserIdentity tCurrentUser = ConnectContext.get().getCurrentUserIdentity().toThrift(); msg.schema_scan_node.setCurrentUserIdent(tCurrentUser); + + msg.schema_scan_node.setTableStructure(SchemaTable.getTableStructure(tableName)); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 53ee7dd637..57a7cb58d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.doris.service; +import org.apache.doris.alter.DecommissionType; import org.apache.doris.analysis.SetType; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; @@ -30,6 +31,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.cluster.Cluster; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; @@ -43,6 +45,7 @@ import org.apache.doris.common.ThriftServerContext; import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InternalCatalog; @@ -57,11 +60,13 @@ import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.FrontendServiceVersion; +import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TColumnDef; import org.apache.doris.thrift.TColumnDesc; import org.apache.doris.thrift.TDescribeTableParams; @@ -69,6 +74,8 @@ import org.apache.doris.thrift.TDescribeTableResult; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; +import org.apache.doris.thrift.TFetchSchemaTableDataRequest; +import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TFrontendPingFrontendRequest; import org.apache.doris.thrift.TFrontendPingFrontendResult; @@ -99,6 +106,7 @@ import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TReportRequest; +import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TS3StorageParam; import org.apache.doris.thrift.TShowVariableRequest; import org.apache.doris.thrift.TShowVariableResult; @@ -119,9 +127,11 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TxnCommitAttachment; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -977,6 +987,122 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + @Override + public TFetchSchemaTableDataResult fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException { + switch (request.getSchemaTableName()) { + case BACKENDS: + return getBackendsSchemaTable(request); + default: + break; + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR)); + return result; + } + + private TFetchSchemaTableDataResult getBackendsSchemaTable(TFetchSchemaTableDataRequest request) { + final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List<Long> backendIds = null; + if (!Strings.isNullOrEmpty(request.cluster_name)) { + final Cluster cluster = Env.getCurrentEnv().getCluster(request.cluster_name); + // root not in any cluster + if (null == cluster) { + return result; + } + backendIds = cluster.getBackendIdList(); + } else { + backendIds = clusterInfoService.getBackendIds(false); + if (backendIds == null) { + return result; + } + } + + long start = System.currentTimeMillis(); + Stopwatch watch = Stopwatch.createUnstarted(); + + List<TRow> dataBatch = Lists.newArrayList(); + for (long backendId : backendIds) { + Backend backend = clusterInfoService.getBackend(backendId); + if (backend == null) { + continue; + } + + watch.start(); + Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + watch.stop(); + + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setLongVal(backendId)); + trow.addToColumnValue(new TCell().setStringVal(backend.getOwnerClusterName())); + trow.addToColumnValue(new TCell().setStringVal(backend.getHost())); + if (Strings.isNullOrEmpty(request.cluster_name)) { + trow.addToColumnValue(new TCell().setIntVal(backend.getHeartbeatPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getBePort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort())); + } + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(backend.isAlive()))); + if (backend.isDecommissioned() && backend.getDecommissionType() == DecommissionType.ClusterDecommission) { + trow.addToColumnValue(new TCell().setStringVal("false")); + trow.addToColumnValue(new TCell().setStringVal("true")); + } else if (backend.isDecommissioned() + && backend.getDecommissionType() == DecommissionType.SystemDecommission) { + trow.addToColumnValue(new TCell().setStringVal("true")); + trow.addToColumnValue(new TCell().setStringVal("false")); + } else { + trow.addToColumnValue(new TCell().setStringVal("false")); + trow.addToColumnValue(new TCell().setStringVal("false")); + } + trow.addToColumnValue(new TCell().setLongVal(tabletNum)); + + // capacity + // data used + trow.addToColumnValue(new TCell().setLongVal(backend.getDataUsedCapacityB())); + + // available + long availB = backend.getAvailableCapacityB(); + trow.addToColumnValue(new TCell().setLongVal(availB)); + + // total + long totalB = backend.getTotalCapacityB(); + trow.addToColumnValue(new TCell().setLongVal(totalB)); + + // used percent + double used = 0.0; + if (totalB <= 0) { + used = 0.0; + } else { + used = (double) (totalB - availB) * 100 / totalB; + } + trow.addToColumnValue(new TCell().setDoubleVal(used)); + trow.addToColumnValue(new TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100)); + + // remote used capacity + trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB())); + + // tags + trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString())); + // err msg + trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg())); + // version + trow.addToColumnValue(new TCell().setStringVal(backend.getVersion())); + // status + trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(backend.getBackendStatus()))); + dataBatch.add(trow); + } + + // backends proc node get result too slow, add log to observer. + LOG.debug("backends proc get tablet num cost: {}, total cost: {}", + watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start)); + + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + private TNetworkAddress getClientAddr() { ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); // For NonBlockingServer, we can not get client ip. diff --git a/gensrc/thrift/Data.thrift b/gensrc/thrift/Data.thrift index 676e22a0f2..fee713694f 100644 --- a/gensrc/thrift/Data.thrift +++ b/gensrc/thrift/Data.thrift @@ -47,17 +47,22 @@ struct TRowBatch { } // this is a union over all possible return types -struct TColumnValue { +struct TCell { // TODO: use <type>_val instead of camelcase 1: optional bool boolVal 2: optional i32 intVal 3: optional i64 longVal 4: optional double doubleVal 5: optional string stringVal + // add type: date datetime } struct TResultRow { - 1: list<TColumnValue> colVals + 1: list<TCell> colVals +} + +struct TRow { + 1: optional list<TCell> column_value } // Serialized, self-contained version of a RowBatch (in be/src/runtime/row-batch.h). diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index f9517e2bfa..a3fc84571b 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -103,7 +103,8 @@ enum TSchemaTableType { SCH_VARIABLES, SCH_VIEWS, SCH_INVALID, - SCH_ROWSETS + SCH_ROWSETS, + SCH_BACKENDS } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5d48c1ae7e..9cf6ffb6ee 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -689,6 +689,20 @@ struct TInitExternalCtlMetaResult { 2: optional string status; } +enum TSchemaTableName{ + BACKENDS = 0, +} + +struct TFetchSchemaTableDataRequest { + 1: optional string cluster_name + 2: optional TSchemaTableName schema_table_name +} + +struct TFetchSchemaTableDataResult { + 1: required Status.TStatus status + 2: optional list<Data.TRow> data_batch; +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -725,4 +739,6 @@ service FrontendService { AgentService.TGetStoragePolicyResult refreshStoragePolicy() TInitExternalCtlMetaResult initExternalCtlMeta(1: TInitExternalCtlMetaRequest request) + + TFetchSchemaTableDataResult fetchSchemaTableData(1: TFetchSchemaTableDataRequest request) } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2df3fea6a2..f8566c2b5d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -453,6 +453,13 @@ struct TCsvScanNode { 10:optional map<string, TMiniLoadEtlFunction> column_function_mapping } +struct TSchemaTableStructure { + 1: optional string column_name + 2: optional Types.TPrimitiveType type + 3: optional i64 len + 4: optional bool is_null; +} + struct TSchemaScanNode { 1: required Types.TTupleId tuple_id @@ -467,6 +474,7 @@ struct TSchemaScanNode { 10: optional string user_ip // deprecated 11: optional Types.TUserIdentity current_user_ident // to replace the user and user_ip 12: optional bool show_hidden_cloumns = false + 13: optional list<TSchemaTableStructure> table_structure } struct TMetaScanNode { diff --git a/regression-test/suites/correctness/test_backends_table.groovy b/regression-test/suites/correctness/test_backends_table.groovy new file mode 100644 index 0000000000..3b4771d825 --- /dev/null +++ b/regression-test/suites/correctness/test_backends_table.groovy @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This suit test the `backends` information_schema table +suite("test_backends_table") { + List<List<Object>> table = sql """ select * from information_schema.backends; """ + assertTrue(table.size() > 0) // row should > 0 + assertTrue(table[0].size == 23) // column should be 23 +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org