This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 03c3419265ac42d103d6a802649c3d08e4d4453b Author: wangbo <wan...@apache.org> AuthorDate: Sat Apr 20 12:32:27 2024 +0800 [Refactor](executor)Add workload schedule policy table (#33729) --- be/src/exec/schema_scanner.cpp | 50 ++++++++ be/src/exec/schema_scanner.h | 4 + .../schema_workload_groups_scanner.cpp | 29 +---- .../schema_workload_sched_policy_scanner.cpp | 136 +++++++++++++++++++++ .../schema_workload_sched_policy_scanner.h | 52 ++++++++ fe/fe-core/src/main/cup/sql_parser.cup | 4 - .../org/apache/doris/analysis/SchemaTableType.java | 6 +- .../analysis/ShowWorkloadSchedPolicyStmt.java | 59 --------- .../java/org/apache/doris/catalog/SchemaTable.java | 10 ++ .../java/org/apache/doris/qe/ShowExecutor.java | 9 -- .../doris/tablefunction/MetadataGenerator.java | 19 ++- .../tablefunction/MetadataTableValuedFunction.java | 2 - .../doris/tablefunction/TableValuedFunctionIf.java | 2 - .../WorkloadSchedPolicyTableValuedFunction.java | 89 -------------- .../doris/datasource/RefreshCatalogTest.java | 4 +- gensrc/thrift/Descriptors.thrift | 3 +- gensrc/thrift/FrontendService.thrift | 1 + .../jdbc/test_mariadb_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog.out | 1 + .../jdbc/test_mysql_jdbc_catalog_nereids.out | 1 + .../jdbc/test_mysql_jdbc_driver5_catalog.out | 1 + .../test_workload_sched_policy.out | 2 +- .../workload_manager_p0/test_curd_wlg.groovy | 6 +- .../test_workload_sched_policy.groovy | 6 +- 24 files changed, 290 insertions(+), 207 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 6c1aac7d0d1..5250d8f1b01 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -48,6 +48,7 @@ #include "exec/schema_scanner/schema_variables_scanner.h" #include "exec/schema_scanner/schema_views_scanner.h" #include "exec/schema_scanner/schema_workload_groups_scanner.h" +#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h" #include "olap/hll.h" #include "runtime/define_primitive_type.h" #include "util/string_util.h" @@ -167,6 +168,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type return SchemaRoutinesScanner::create_unique(); case TSchemaTableType::SCH_USER: return SchemaUserScanner::create_unique(); + case TSchemaTableType::SCH_WORKLOAD_SCHEDULE_POLICY: + return SchemaWorkloadSchedulePolicyScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; @@ -339,4 +342,51 @@ std::string SchemaScanner::get_db_from_full_name(const std::string& full_name) { return full_name; } +Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized::Block* block, + PrimitiveType type) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + + switch (type) { + case TYPE_BIGINT: { + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value( + cell.longVal); + nullable_column->get_null_map_data().emplace_back(0); + break; + } + + case TYPE_INT: { + reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value( + cell.intVal); + nullable_column->get_null_map_data().emplace_back(0); + break; + } + + case TYPE_BOOLEAN: { + reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value( + cell.boolVal); + nullable_column->get_null_map_data().emplace_back(0); + break; + } + + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: { + reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(cell.stringVal.data(), + cell.stringVal.size()); + nullable_column->get_null_map_data().emplace_back(0); + break; + } + + default: { + std::stringstream ss; + ss << "unsupported column type:" << type; + return Status::InternalError(ss.str()); + } + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index 50954999e2a..a23706ac6a4 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/Data_types.h> #include <gen_cpp/Descriptors_types.h> #include <stddef.h> #include <stdint.h> @@ -105,6 +106,9 @@ protected: Status fill_dest_column_for_range(vectorized::Block* block, size_t pos, const std::vector<void*>& datas); + Status insert_block_column(TCell cell, int col_index, vectorized::Block* block, + PrimitiveType type); + // get dbname from catalogname.dbname // if full_name does not have catalog part, just return origin name. std::string get_db_from_full_name(const std::string& full_name); diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index 03bf9782dcd..55cdfe9cf35 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -103,37 +103,12 @@ Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() { } } - // todo(wb) reuse this callback function - auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), - str_val.size()); - nullable_column->get_null_map_data().emplace_back(0); - }; - auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { - vectorized::MutableColumnPtr mutable_col_ptr; - mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); - auto* nullable_column = - reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); - vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value( - int_val); - nullable_column->get_null_map_data().emplace_back(0); - }; - for (int i = 0; i < result_data.size(); i++) { TRow row = result_data[i]; for (int j = 0; j < _s_tbls_columns.size(); j++) { - if (_s_tbls_columns[j].type == TYPE_BIGINT) { - insert_int_value(j, row.column_value[j].longVal, _workload_groups_block.get()); - } else { - insert_string_value(j, row.column_value[j].stringVal, _workload_groups_block.get()); - } + RETURN_IF_ERROR(insert_block_column( + row.column_value[j], j, _workload_groups_block.get(), _s_tbls_columns[j].type)); } } return Status::OK(); diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp new file mode 100644 index 00000000000..725544ad5a5 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadSchedulePolicyScanner::_s_tbls_columns = { + {"ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CONDITION", TYPE_STRING, sizeof(StringRef), true}, + {"ACTION", TYPE_STRING, sizeof(StringRef), true}, + {"PRIORITY", TYPE_INT, sizeof(int32_t), true}, + {"ENABLED", TYPE_BOOLEAN, sizeof(bool), true}, + {"VERSION", TYPE_INT, sizeof(int32_t), true}, +}; + +SchemaWorkloadSchedulePolicyScanner::SchemaWorkloadSchedulePolicyScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_SCHEDULE_POLICY) {} + +SchemaWorkloadSchedulePolicyScanner::~SchemaWorkloadSchedulePolicyScanner() {} + +Status SchemaWorkloadSchedulePolicyScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + + TSchemaTableRequestParams schema_table_request_params; + for (int i = 0; i < _s_tbls_columns.size(); i++) { + schema_table_request_params.__isset.columns_name = true; + schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name); + } + schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident); + + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::WORKLOAD_SCHEDULE_POLICY); + request.__set_schema_table_params(schema_table_request_params); + + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout)); + + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch workload groups from FE failed, errmsg=" << status; + return status; + } + std::vector<TRow> result_data = result.data_batch; + + _block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type, + _s_tbls_columns[i].name)); + } + + _block->reserve(_block_rows_limit); + + if (result_data.size() > 0) { + int col_size = result_data[0].column_value.size(); + if (col_size != _s_tbls_columns.size()) { + return Status::InternalError<false>( + "workload policy schema is not match for FE and BE"); + } + } + + for (int i = 0; i < result_data.size(); i++) { + TRow row = result_data[i]; + for (int j = 0; j < _s_tbls_columns.size(); j++) { + RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _block.get(), + _s_tbls_columns[j].type)); + } + } + return Status::OK(); +} + +Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_block == nullptr) { + RETURN_IF_ERROR(_get_workload_schedule_policy_block_from_fe()); + _total_rows = _block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h new file mode 100644 index 00000000000..5284975fe66 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaWorkloadSchedulePolicyScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaWorkloadSchedulePolicyScanner); + +public: + SchemaWorkloadSchedulePolicyScanner(); + ~SchemaWorkloadSchedulePolicyScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; + +private: + Status _get_workload_schedule_policy_block_from_fe(); + + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr<vectorized::Block> _block = nullptr; + int _rpc_timeout = 3000; +}; +}; // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 26129f2d2c0..10c5977ddfa 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4243,10 +4243,6 @@ show_param ::= {: RESULT = new ShowWorkloadGroupsStmt(parser.wild, parser.where); :} - | KW_WORKLOAD KW_SCHEDULE KW_POLICY - {: - RESULT = new ShowWorkloadSchedPolicyStmt(); - :} | KW_BACKENDS {: RESULT = new ShowBackendsStmt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 2c0901c1e50..93d6e3e55a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -74,7 +74,11 @@ public enum SchemaTableType { SCH_ACTIVE_QUERIES("ACTIVE_QUERIES", "ACTIVE_QUERIES", TSchemaTableType.SCH_ACTIVE_QUERIES), SCH_WORKLOAD_GROUPS("WORKLOAD_GROUPS", "WORKLOAD_GROUPS", TSchemaTableType.SCH_WORKLOAD_GROUPS), SCHE_USER("user", "user", TSchemaTableType.SCH_USER), - SCH_PROCS_PRIV("procs_priv", "procs_priv", TSchemaTableType.SCH_PROCS_PRIV); + SCH_PROCS_PRIV("procs_priv", "procs_priv", TSchemaTableType.SCH_PROCS_PRIV), + + SCH_WORKLOAD_SCHEDULE_POLICY("WORKLOAD_SCHEDULE_POLICY", "WORKLOAD_SCHEDULE_POLICY", + TSchemaTableType.SCH_WORKLOAD_SCHEDULE_POLICY); + private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java deleted file mode 100644 index a128ee3e8f7..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowWorkloadSchedPolicyStmt.java +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.UserException; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.ShowResultSetMetaData; -import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; - -public class ShowWorkloadSchedPolicyStmt extends ShowStmt { - - public ShowWorkloadSchedPolicyStmt() { - } - - @Override - public void analyze(Analyzer analyzer) throws UserException { - super.analyze(analyzer); - } - - @Override - public String toSql() { - return "SHOW WORKLOAD SCHEDULE POLICY"; - } - - @Override - public ShowResultSetMetaData getMetaData() { - ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); - for (String title : WorkloadSchedPolicyMgr.WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES) { - builder.addColumn(new Column(title, ScalarType.createVarchar(1000))); - } - return builder.build(); - } - - @Override - public RedirectStatus getRedirectStatus() { - if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { - return RedirectStatus.FORWARD_NO_SYNC; - } else { - return RedirectStatus.NO_FORWARD; - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 89d1c363c10..a8884c61a55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -499,6 +499,16 @@ public class SchemaTable extends Table { .column("STATE", ScalarType.createVarchar(64)) .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) .build())) + .put("workload_schedule_policy", + new SchemaTable(SystemIdGenerator.getNextId(), "workload_schedule_policy", TableType.SCHEMA, + builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("NAME", ScalarType.createVarchar(256)) + .column("CONDITION", ScalarType.createStringType()) + .column("ACTION", ScalarType.createStringType()) + .column("PRIORITY", ScalarType.createType(PrimitiveType.INT)) + .column("ENABLED", ScalarType.createType(PrimitiveType.BOOLEAN)) + .column("VERSION", ScalarType.createType(PrimitiveType.INT)) + .build())) .build(); protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index cc2570cc2a6..58d22b7181b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -106,7 +106,6 @@ import org.apache.doris.analysis.ShowUserPropertyStmt; import org.apache.doris.analysis.ShowVariablesStmt; import org.apache.doris.analysis.ShowViewStmt; import org.apache.doris.analysis.ShowWorkloadGroupsStmt; -import org.apache.doris.analysis.ShowWorkloadSchedPolicyStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.backup.AbstractJob; import org.apache.doris.backup.BackupJob; @@ -354,8 +353,6 @@ public class ShowExecutor { handleShowResources(); } else if (stmt instanceof ShowWorkloadGroupsStmt) { handleShowWorkloadGroups(); - } else if (stmt instanceof ShowWorkloadSchedPolicyStmt) { - handleShowWorkloadSchedPolicy(); } else if (stmt instanceof ShowExportStmt) { handleShowExport(); } else if (stmt instanceof ShowBackendsStmt) { @@ -2041,12 +2038,6 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), workloadGroupsInfos); } - private void handleShowWorkloadSchedPolicy() { - ShowWorkloadSchedPolicyStmt showStmt = (ShowWorkloadSchedPolicyStmt) stmt; - List<List<String>> workloadSchedInfo = Env.getCurrentEnv().getWorkloadSchedPolicyMgr().getShowPolicyInfo(); - resultSet = new ShowResultSet(showStmt.getMetaData(), workloadSchedInfo); - } - private void handleShowExport() throws AnalysisException { ShowExportStmt showExportStmt = (ShowExportStmt) stmt; Env env = Env.getCurrentEnv(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 60f3806dc9f..85d9ce94cdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -98,6 +98,8 @@ public class MetadataGenerator { private static final ImmutableMap<String, Integer> ROUTINE_INFO_COLUMN_TO_INDEX; + private static final ImmutableMap<String, Integer> WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX; + static { ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new ImmutableMap.Builder(); List<Column> activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema(); @@ -117,6 +119,14 @@ public class MetadataGenerator { routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(), i); } ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build(); + + ImmutableMap.Builder<String, Integer> policyBuilder = new ImmutableMap.Builder(); + List<Column> policyColList = SchemaTable.TABLE_MAP.get("workload_schedule_policy").getFullSchema(); + for (int i = 0; i < policyColList.size(); i++) { + policyBuilder.put(policyColList.get(i).getName().toLowerCase(), i); + } + WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX = policyBuilder.build(); + } public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { @@ -156,9 +166,6 @@ public class MetadataGenerator { case TASKS: result = taskMetadataResult(params); break; - case WORKLOAD_SCHED_POLICY: - result = workloadSchedPolicyMetadataResult(params); - break; default: return errorResult("Metadata table params is not set."); } @@ -192,6 +199,10 @@ public class MetadataGenerator { result = routineInfoMetadataResult(schemaTableParams); columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX; break; + case WORKLOAD_SCHEDULE_POLICY: + result = workloadSchedPolicyMetadataResult(schemaTableParams); + columnIndex = WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX; + break; default: return errorResult("invalid schema table name."); } @@ -469,7 +480,7 @@ public class MetadataGenerator { return result; } - private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) { + private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TSchemaTableRequestParams params) { if (!params.isSetCurrentUserIdent()) { return errorResult("current user ident is not set."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 56e769cc8b3..af37bcd10e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -48,8 +48,6 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case TASKS: return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); - case WORKLOAD_SCHED_POLICY: - return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName); default: throw new AnalysisException("Unknown Metadata TableValuedFunction type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 64e794757d1..41ed6e14cb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -72,8 +72,6 @@ public abstract class TableValuedFunctionIf { return new TasksTableValuedFunction(params); case GroupCommitTableValuedFunction.NAME: return new GroupCommitTableValuedFunction(params); - case WorkloadSchedPolicyTableValuedFunction.NAME: - return new WorkloadSchedPolicyTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java deleted file mode 100644 index 0bf2fa7e5d1..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.tablefunction; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.thrift.TMetaScanRange; -import org.apache.doris.thrift.TMetadataType; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import java.util.List; -import java.util.Map; - -public class WorkloadSchedPolicyTableValuedFunction extends MetadataTableValuedFunction { - - public static final String NAME = "workload_schedule_policy"; - - private static final ImmutableList<Column> SCHEMA = ImmutableList.of( - new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)), - new Column("Name", ScalarType.createStringType()), - new Column("Condition", ScalarType.createType(PrimitiveType.STRING)), - new Column("Action", ScalarType.createType(PrimitiveType.STRING)), - new Column("Priority", ScalarType.createType(PrimitiveType.INT)), - new Column("Enabled", ScalarType.createType(PrimitiveType.BOOLEAN)), - new Column("Version", ScalarType.createType(PrimitiveType.INT)), - new Column("WorkloadGroup", ScalarType.createType(PrimitiveType.STRING))); - - private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; - - static { - ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder(); - for (int i = 0; i < SCHEMA.size(); i++) { - builder.put(SCHEMA.get(i).getName().toLowerCase(), i); - } - COLUMN_TO_INDEX = builder.build(); - } - - public static Integer getColumnIndexFromColumnName(String columnName) { - return COLUMN_TO_INDEX.get(columnName.toLowerCase()); - } - - public WorkloadSchedPolicyTableValuedFunction(Map<String, String> params) { - if (params.size() > 0) { - throw new org.apache.doris.nereids.exceptions.AnalysisException( - "workload schedule policy table-valued-function does not support any params"); - } - } - - @Override - public TMetadataType getMetadataType() { - return TMetadataType.WORKLOAD_SCHED_POLICY; - } - - @Override - public TMetaScanRange getMetaScanRange() { - TMetaScanRange metaScanRange = new TMetaScanRange(); - metaScanRange.setMetadataType(TMetadataType.WORKLOAD_SCHED_POLICY); - return metaScanRange; - } - - @Override - public String getTableName() { - return "WorkloadSchedPolicyTableValuedFunction"; - } - - @Override - public List<Column> getTableColumns() throws AnalysisException { - return SCHEMA; - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 26c1f5d7664..45a46d09121 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -103,7 +103,7 @@ public class RefreshCatalogTest extends TestWithFeService { List<String> dbNames2 = test1.getDbNames(); Assertions.assertEquals(5, dbNames2.size()); ExternalInfoSchemaDatabase infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(31, infoDb.getTables().size()); + Assertions.assertEquals(32, infoDb.getTables().size()); TestExternalDatabase testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); ExternalMysqlDatabase mysqlDb = (ExternalMysqlDatabase) test1.getDb(MysqlDb.DATABASE_NAME).get(); @@ -114,7 +114,7 @@ public class RefreshCatalogTest extends TestWithFeService { CatalogMgr mgr2 = GsonUtils.GSON.fromJson(json, CatalogMgr.class); test1 = mgr2.getCatalog("test1"); infoDb = (ExternalInfoSchemaDatabase) test1.getDb(InfoSchemaDb.DATABASE_NAME).get(); - Assertions.assertEquals(31, infoDb.getTables().size()); + Assertions.assertEquals(32, infoDb.getTables().size()); testDb = (TestExternalDatabase) test1.getDb("db1").get(); Assertions.assertEquals(2, testDb.getTables().size()); mysqlDb = (ExternalMysqlDatabase) test1.getDb(MysqlDb.DATABASE_NAME).get(); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index d99ea0bea86..2e92ffe0ba3 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -130,7 +130,8 @@ enum TSchemaTableType { SCH_ACTIVE_QUERIES, SCH_WORKLOAD_GROUPS, SCH_USER, - SCH_PROCS_PRIV; + SCH_PROCS_PRIV, + SCH_WORKLOAD_SCHEDULE_POLICY; } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9f1c1371a0e..d2996f2a5dc 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -908,6 +908,7 @@ enum TSchemaTableName { ACTIVE_QUERIES = 2, // db information_schema's table WORKLOAD_GROUPS = 3, // db information_schema's table ROUTINES_INFO = 4, // db information_schema's table + WORKLOAD_SCHEDULE_POLICY = 5, } struct TMetadataTableRequestParams { diff --git a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out index 01ba13f742a..b7113c06cbd 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mariadb_jdbc_catalog.out @@ -59,6 +59,7 @@ triggers user_privileges views workload_groups +workload_schedule_policy -- !auto_default_t -- 0 diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out index 4586d38228a..300c28b25d9 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out @@ -223,6 +223,7 @@ triggers user_privileges views workload_groups +workload_schedule_policy -- !dt -- 2023-06-17T10:00 2023-06-17T10:00:01.100 2023-06-17T10:00:02.220 2023-06-17T10:00:03.333 2023-06-17T10:00:04.444400 2023-06-17T10:00:05.555550 2023-06-17T10:00:06.666666 diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out index 09714901c14..7a6399f87a2 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog_nereids.out @@ -191,6 +191,7 @@ triggers user_privileges views workload_groups +workload_schedule_policy -- !test_insert1 -- doris1 18 diff --git a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out index 35c805ab896..6dee70583c1 100644 --- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out +++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_driver5_catalog.out @@ -233,6 +233,7 @@ triggers user_privileges views workload_groups +workload_schedule_policy -- !dt -- 2023-06-17T10:00 2023-06-17T10:00:01 2023-06-17T10:00:02 2023-06-17T10:00:03 2023-06-17T10:00:04 2023-06-17T10:00:05 2023-06-17T10:00:06 diff --git a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out index d32fff321e4..65b4c1901b6 100644 --- a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out +++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out @@ -2,7 +2,7 @@ -- !select_policy_tvf -- be_policy query_time > 10 cancel_query 10 false 0 fe_policy username = root set_session_variable "workload_group=normal" 10 false 0 -set_action_policy username = root set_session_variable "workload_group=normal" 0 true 0 +set_action_policy username = root set_session_variable "workload_group=normal" 0 false 0 test_cancel_policy query_time > 10 cancel_query 0 false 0 -- !select_policy_tvf_after_drop -- diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 8acfc8cb4ac..875eeb668e2 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -301,21 +301,21 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_queue_size'='0' );" Thread.sleep(10000) test { - sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" + sql "select /*+SET_VAR(workload_group=test_group)*/ * from ${table_name};" exception "query waiting queue is full" } // test insert into select will go to queue test { - sql "insert into ${table_name2} select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" + sql "insert into ${table_name2} select /*+SET_VAR(workload_group=test_group)*/ * from ${table_name};" exception "query waiting queue is full" } // test create table as select will go to queue test { - sql "create table ${table_name3} PROPERTIES('replication_num' = '1') as select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" + sql "create table ${table_name3} PROPERTIES('replication_num' = '1') as select /*+SET_VAR(workload_group=test_group)*/ * from ${table_name};" exception "query waiting queue is full" } diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index 8531b3cf34a..d8ab2611094 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -32,7 +32,7 @@ suite("test_workload_sched_policy") { // 2 create set policy sql "create workload schedule policy set_action_policy " + "conditions(username='root') " + - "actions(set_session_variable 'workload_group=normal');" + "actions(set_session_variable 'workload_group=normal') properties('enabled'='false');" // 3 create policy run in fe sql "create workload schedule policy fe_policy " + @@ -52,7 +52,7 @@ suite("test_workload_sched_policy") { "'priority'='10' " + ");" - qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" + qt_select_policy_tvf "select name,condition,action,priority,enabled,version from information_schema.workload_schedule_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" // test_alter sql "alter workload schedule policy fe_policy properties('priority'='2', 'enabled'='false');" @@ -112,7 +112,7 @@ suite("test_workload_sched_policy") { sql "drop workload schedule policy fe_policy;" sql "drop workload schedule policy be_policy;" - qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" + qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from information_schema.workload_schedule_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" // test workload schedule policy sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '500');" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org