This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9bcc5ce569d [Improvement]Add schema table backend_active_tasks (#31945) 9bcc5ce569d is described below commit 9bcc5ce569df0fc49fae792facc82b8ceb6f0f83 Author: wangbo <wan...@apache.org> AuthorDate: Sat Mar 9 10:19:07 2024 +0800 [Improvement]Add schema table backend_active_tasks (#31945) --- be/src/exec/schema_scanner.cpp | 3 + .../schema_scanner/schema_backend_active_tasks.cpp | 94 ++++++++++++++++++++++ .../schema_scanner/schema_backend_active_tasks.h | 49 +++++++++++ be/src/runtime/runtime_query_statistics_mgr.cpp | 81 +++++++++---------- be/src/runtime/runtime_query_statistics_mgr.h | 8 +- be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +---- be/src/vec/exec/scan/vmeta_scanner.h | 1 - .../org/apache/doris/analysis/SchemaTableType.java | 3 +- .../doris/catalog/BuiltinTableValuedFunctions.java | 4 +- .../java/org/apache/doris/catalog/SchemaTable.java | 14 ++++ .../expressions/functions/table/ActiveBeTasks.java | 58 ------------- .../visitor/TableValuedFunctionVisitor.java | 5 -- .../planner/BackendPartitionedSchemaScanNode.java | 12 ++- .../ActiveBeTasksTableValuedFunction.java | 76 ----------------- .../doris/tablefunction/TableValuedFunctionIf.java | 2 - .../doris/datasource/RefreshCatalogTest.java | 4 +- gensrc/thrift/Descriptors.thrift | 3 +- gensrc/thrift/Types.thrift | 3 +- .../jdbc/test_mariadb_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog_nereids.out | 1 + .../meta_scan/test_backend_active_tasks.groovy | 43 ++++++++++ 22 files changed, 271 insertions(+), 217 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index b700d36f209..bff59130e8b 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -26,6 +26,7 @@ #include <ostream> #include <utility> +#include "exec/schema_scanner/schema_backend_active_tasks.h" #include "exec/schema_scanner/schema_charsets_scanner.h" #include "exec/schema_scanner/schema_collations_scanner.h" #include "exec/schema_scanner/schema_columns_scanner.h" @@ -149,6 +150,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type return SchemaMetadataNameIdsScanner::create_unique(); case TSchemaTableType::SCH_PROFILING: return SchemaProfilingScanner::create_unique(); + case TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS: + return SchemaBackendActiveTasksScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp new file mode 100644 index 00000000000..c5f8825c2e4 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_backend_active_tasks.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = { + // name, type, size + {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false}, + {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, + {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, + {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, + {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, + {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, +}; + +SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BACKEND_ACTIVE_TASKS) {} + +SchemaBackendActiveTasksScanner::~SchemaBackendActiveTasksScanner() {} + +Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + return Status::OK(); +} + +Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_task_stats_block == nullptr) { + _task_stats_block = vectorized::Block::create_unique(); + + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _task_stats_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + + _task_stats_block->reserve(_block_rows_limit); + + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block( + _task_stats_block.get()); + _total_rows = _task_stats_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h b/be/src/exec/schema_scanner/schema_backend_active_tasks.h new file mode 100644 index 00000000000..d8a2a1ffa3f --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaBackendActiveTasksScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaBackendActiveTasksScanner); + +public: + SchemaBackendActiveTasksScanner(); + ~SchemaBackendActiveTasksScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; + +private: + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr<vectorized::Block> _task_stats_block = nullptr; +}; +}; // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index ee09b0c30dc..9764b0f0507 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -21,6 +21,7 @@ #include "runtime/exec_env.h" #include "util/debug_util.h" #include "util/time.h" +#include "vec/core/block.h" namespace doris { @@ -199,54 +200,52 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64 } } -std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics( - std::vector<std::string> filter_columns) { +void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* block) { std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock); - std::vector<TRow> table_rows; int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value( + int_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + + // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { - TRow trow; - TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); - - for (auto iter = filter_columns.begin(); iter != filter_columns.end(); iter++) { - std::string col_name = *iter; - - TCell tcell; - if (col_name == "beid") { - tcell.longVal = be_id; - } else if (col_name == "fehost") { - tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname; - } else if (col_name == "queryid") { - tcell.stringVal = query_id; - } else if (col_name == "tasktimems") { - if (qs_ctx_ptr->_is_query_finished) { - tcell.longVal = qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time; - } else { - tcell.longVal = MonotonicMillis() - qs_ctx_ptr->_query_start_time; - } - } else if (col_name == "taskcputimems") { - tcell.longVal = tqs.cpu_ms; - } else if (col_name == "scanrows") { - tcell.longVal = tqs.scan_rows; - } else if (col_name == "scanbytes") { - tcell.longVal = tqs.scan_bytes; - } else if (col_name == "bepeakmemorybytes") { - tcell.longVal = tqs.max_peak_memory_bytes; - } else if (col_name == "currentusedmemorybytes") { - tcell.longVal = tqs.current_used_memory_bytes; - } else if (col_name == "shufflesendbytes") { - tcell.longVal = tqs.shuffle_send_bytes; - } else if (col_name == "shufflesendRows") { - tcell.longVal = tqs.shuffle_send_rows; - } - trow.column_value.push_back(tcell); - } - table_rows.push_back(trow); + insert_int_value(0, be_id, block); + insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); + insert_string_value(2, query_id, block); + + int64_t task_time = qs_ctx_ptr->_is_query_finished + ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time + : MonotonicMillis() - qs_ctx_ptr->_query_start_time; + insert_int_value(3, task_time, block); + insert_int_value(4, tqs.cpu_ms, block); + insert_int_value(5, tqs.scan_rows, block); + insert_int_value(6, tqs.scan_bytes, block); + insert_int_value(7, tqs.max_peak_memory_bytes, block); + insert_int_value(8, tqs.current_used_memory_bytes, block); + insert_int_value(9, tqs.shuffle_send_bytes, block); + insert_int_value(10, tqs.shuffle_send_rows, block); } - return table_rows; } } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 44badd196a3..1b3e164d48f 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -28,6 +28,10 @@ namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + class QueryStatisticsCtx { public: QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { @@ -68,8 +72,8 @@ public: void get_metric_map(std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map); - // used for tvf active_queries - std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string> filter_columns); + // used for backend_active_tasks + void get_active_be_tasks_block(vectorized::Block* block); private: std::shared_mutex _qs_ctx_map_lock; diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 7d438366cf2..22545fa4dce 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -34,7 +34,6 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/thrift_rpc_helper.h" @@ -96,12 +95,7 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable"); } - if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ACTIVE_BE_TASKS) { - // tvf active_be_tasks fetch data in be directly, it does not need to request FE for data - RETURN_IF_ERROR(_build_active_be_tasks_data()); - } else { - RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); - } + RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); return Status::OK(); } @@ -294,20 +288,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } -Status VMetaScanner::_build_active_be_tasks_data() { - std::vector<std::string> filter_columns; - for (const auto& slot : _tuple_desc->slots()) { - filter_columns.emplace_back(slot->col_name_lower_case()); - } - - std::vector<TRow> ret = - ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics( - filter_columns); - _batch_data = std::move(ret); - - return Status::OK(); -} - Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 25cb9345311..518f42ffc1c 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -91,7 +91,6 @@ private: TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); - Status _build_active_be_tasks_data(); bool _meta_eos; TupleId _tuple_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 59bae154d57..7a7d547e240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -69,7 +69,8 @@ public enum SchemaTableType { SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS), SCH_PARAMETERS("PARAMETERS", "PARAMETERS", TSchemaTableType.SCH_PARAMETERS), SCH_METADATA_NAME_IDS("METADATA_NAME_IDS", "METADATA_NAME_IDS", TSchemaTableType.SCH_METADATA_NAME_IDS), - SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING); + SCH_PROFILING("PROFILING", "PROFILING", TSchemaTableType.SCH_PROFILING), + SCH_BACKEND_ACTIVE_TASKS("BACKEND_ACTIVE_TASKS", "BACKEND_ACTIVE_TASKS", TSchemaTableType.SCH_BACKEND_ACTIVE_TASKS); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index acdeb683f26..ac1b31fceac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -60,8 +59,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), - tableValued(WorkloadGroups.class, "workload_groups"), - tableValued(ActiveBeTasks.class, "active_be_tasks") + tableValued(WorkloadGroups.class, "workload_groups") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 7215cf0fc76..9e068418610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -444,6 +444,20 @@ public class SchemaTable extends Table { .column("SOURCE_FILE", ScalarType.createVarchar(20)) .column("SOURCE_LINE", ScalarType.createType(PrimitiveType.INT)) .build())) + .put("backend_active_tasks", + new SchemaTable(SystemIdGenerator.getNextId(), "backend_active_tasks", TableType.SCHEMA, + builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("FE_HOST", ScalarType.createVarchar(256)) + .column("QUERY_ID", ScalarType.createVarchar(256)) + .column("TASK_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TASK_CPU_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCAN_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BE_PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SHUFFLE_SEND_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .build())) .build(); protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java deleted file mode 100644 index 5737f52a2b9..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveBeTasks.java +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.expressions.functions.table; - -import org.apache.doris.catalog.FunctionSignature; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.trees.expressions.Properties; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; -import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction; -import org.apache.doris.tablefunction.TableValuedFunctionIf; - -import java.util.Map; - -/** - * stands be running tasks status, currently main including select/streamload/broker load/insert select - */ -public class ActiveBeTasks extends TableValuedFunction { - - public ActiveBeTasks(Properties properties) { - super("active_be_tasks", properties); - } - - @Override - public FunctionSignature customSignature() { - return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); - } - - @Override - protected TableValuedFunctionIf toCatalogFunction() { - try { - Map<String, String> arguments = getTVFProperties().getMap(); - return new ActiveBeTasksTableValuedFunction(arguments); - } catch (Throwable t) { - throw new AnalysisException("Can not build ActiveBeTasksTableValuedFunction by " - + this + ": " + t.getMessage(), t); - } - } - - public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { - return visitor.visitActiveBeTasks(this, context); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 36561e5b12c..fba34d48168 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.trees.expressions.visitor; -import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks; import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries; import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; @@ -103,8 +102,4 @@ public interface TableValuedFunctionVisitor<R, C> { default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { return visitTableValuedFunction(workloadGroups, context); } - - default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) { - return visitTableValuedFunction(beTasks, context); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 592fc3c96cf..dc57b67d98a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -41,8 +41,10 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * The BackendSchemaScanNode used for those SchemaTable which data are need to acquire from backends. @@ -51,10 +53,16 @@ import java.util.Map; * So, we can use partitionInfo to select the necessary `be` to send query. */ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { - public static final String ROWSETS = "rowsets"; + + public static final Set<String> BACKEND_TABLE = new HashSet<>(); + + static { + BACKEND_TABLE.add("rowsets"); + BACKEND_TABLE.add("backend_active_tasks"); + } public static boolean isBackendPartitionedSchemaTable(String tableName) { - if (tableName.equalsIgnoreCase(ROWSETS)) { + if (BACKEND_TABLE.contains(tableName.toLowerCase())) { return true; } return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java deleted file mode 100644 index 99a8ba4886f..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveBeTasksTableValuedFunction.java +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.tablefunction; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.thrift.TMetaScanRange; -import org.apache.doris.thrift.TMetadataType; - -import com.google.common.collect.ImmutableList; - -import java.util.List; -import java.util.Map; - -public class ActiveBeTasksTableValuedFunction extends MetadataTableValuedFunction { - - public static final String NAME = "active_be_tasks"; - - private static final ImmutableList<Column> SCHEMA = ImmutableList.of( - new Column("BeId", PrimitiveType.BIGINT), - new Column("FeHost", ScalarType.createStringType()), - new Column("QueryId", ScalarType.createStringType()), - new Column("TaskTimeMs", PrimitiveType.BIGINT), - new Column("TaskCpuTimeMs", PrimitiveType.BIGINT), - new Column("ScanRows", PrimitiveType.BIGINT), - new Column("ScanBytes", PrimitiveType.BIGINT), - new Column("BePeakMemoryBytes", PrimitiveType.BIGINT), - new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendBytes", PrimitiveType.BIGINT), - new Column("ShuffleSendRows", PrimitiveType.BIGINT)); - - public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws AnalysisException { - if (params.size() != 0) { - throw new AnalysisException("ActiveBeTasks table-valued-function does not support any params"); - } - } - - @Override - public TMetadataType getMetadataType() { - return TMetadataType.ACTIVE_BE_TASKS; - } - - @Override - public TMetaScanRange getMetaScanRange() { - TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS); - return metaScanRange; - } - - @Override - public String getTableName() { - return "ActiveBeTasksTableValuedFunction"; - } - - @Override - public List<Column> getTableColumns() throws AnalysisException { - return SCHEMA; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 4b755a97bf8..f9fb76a9666 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -78,8 +78,6 @@ public abstract class TableValuedFunctionIf { return new ActiveQueriesTableValuedFunction(params); case WorkloadSchedPolicyTableValuedFunction.NAME: return new WorkloadSchedPolicyTableValuedFunction(params); - case ActiveBeTasksTableValuedFunction.NAME: - return new ActiveBeTasksTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index f5244005c16..5b4c15e876e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -87,7 +87,7 @@ public class RefreshCatalogTest extends TestWithFeService { List<String> dbNames2 = test1.getDbNames(); Assertions.assertEquals(4, dbNames2.size()); ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(27, infoDb.getTables().size()); + Assertions.assertEquals(28, infoDb.getTables().size()); TestExternalDatabase testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); @@ -96,7 +96,7 @@ public class RefreshCatalogTest extends TestWithFeService { CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class); test1 = mgr2.getCatalog("test1"); infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(27, infoDb.getTables().size()); + Assertions.assertEquals(28, infoDb.getTables().size()); testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 5e77729becd..4e038aecb89 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -125,7 +125,8 @@ enum TSchemaTableType { SCH_COLUMN_STATISTICS, SCH_PARAMETERS, SCH_METADATA_NAME_IDS, - SCH_PROFILING; + SCH_PROFILING, + SCH_BACKEND_ACTIVE_TASKS; } enum THdfsCompression { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 04a1fd35163..2cd6af9f050 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -715,8 +715,7 @@ enum TMetadataType { JOBS, TASKS, QUERIES, - WORKLOAD_SCHED_POLICY, - ACTIVE_BE_TASKS, + WORKLOAD_SCHED_POLICY } enum TIcebergQueryType { diff --git a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out index 0ad76d0db4c..ff09873c18c 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out @@ -23,6 +23,7 @@ mariadb_jdbc_catalog 115 abg -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 3520a11d8bc..309c753f5d6 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -192,6 +192,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 2 2 -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out index a66c418fede..fcbf4f99244 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out @@ -160,6 +160,7 @@ bca 2022-11-02 2022-11-02 8012 vivo 123456789012345678901234567890123.12345 12345678901234567890123456789012.12345 1234567890123456789012345678901234.12345 123456789012345678901234567890123.12345 123456789012345678901234567890123456789012345678901234567890.12345 123456789012345678901234567890123456789012345678901234567890.12345 -- !information_schema -- +backend_active_tasks character_sets collations column_privileges diff --git a/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy new file mode 100644 index 00000000000..172a7bbf850 --- /dev/null +++ b/regression-test/suites/query_p0/meta_scan/test_backend_active_tasks.groovy @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_backend_active_tasks") { + def thread1 = new Thread({ + while(true) { + // non-pipeline + sql "set experimental_enable_pipeline_engine=false" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + + // pipeline + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=false" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + + // pipelinex + sql "set experimental_enable_pipeline_engine=true" + sql "set experimental_enable_pipeline_x_engine=true" + sql "select * from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + Thread.sleep(1000) + } + }) + thread1.setDaemon(true) + thread1.start() +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org