This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 12ee9223afc [enhancement](plsql) Support select * from routines (#32866) 12ee9223afc is described below commit 12ee9223afcbcc5d8c17c2a0a89ddbcfb89a424c Author: Vallish Pai <vallish...@gmail.com> AuthorDate: Thu Mar 28 13:52:50 2024 +0530 [enhancement](plsql) Support select * from routines (#32866) Support show of plsql procedure using select * from routines. --- be/src/exec/schema_scanner.cpp | 1 + .../exec/schema_scanner/schema_routine_scanner.cpp | 172 +++++++++++++++++++++ .../exec/schema_scanner/schema_routine_scanner.h | 52 +++++++ .../apache/doris/plsql/metastore/PlsqlManager.java | 16 +- .../apache/doris/service/FrontendServiceImpl.java | 67 ++++---- .../doris/tablefunction/MetadataGenerator.java | 96 ++++++++++-- gensrc/thrift/FrontendService.thrift | 1 + 7 files changed, 362 insertions(+), 43 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 87995a3f0ef..6c1aac7d0d1 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -37,6 +37,7 @@ #include "exec/schema_scanner/schema_partitions_scanner.h" #include "exec/schema_scanner/schema_processlist_scanner.h" #include "exec/schema_scanner/schema_profiling_scanner.h" +#include "exec/schema_scanner/schema_routine_scanner.h" #include "exec/schema_scanner/schema_rowsets_scanner.h" #include "exec/schema_scanner/schema_schema_privileges_scanner.h" #include "exec/schema_scanner/schema_schemata_scanner.h" diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp new file mode 100644 index 00000000000..7db46ada650 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_routine_scanner.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns = { + {"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DTD_IDENTIFIER", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_BODY", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_DEFINITION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"EXTERNAL_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"EXTERNAL_LANGUAGE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"PARAMETER_STYLE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"IS_DETERMINISTIC", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SQL_DATA_ACCESS", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SQL_PATH", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SECURITY_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CREATED", TYPE_DATETIME, sizeof(int64_t), true}, + {"LAST_ALTERED", TYPE_DATETIME, sizeof(int64_t), true}, + {"SQL_MODE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_COMMENT", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DEFINER", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CHARACTER_SET_CLIENT", TYPE_VARCHAR, sizeof(StringRef), true}, + {"COLLATION_CONNECTION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DATABASE_COLLATION", TYPE_VARCHAR, sizeof(StringRef), true}, +}; + +SchemaRoutinesScanner::SchemaRoutinesScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PROCEDURES) {} + +Status SchemaRoutinesScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaRoutinesScanner::get_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TSchemaTableRequestParams schema_table_request_params; + for (int i = 0; i < _s_tbls_columns.size(); i++) { + schema_table_request_params.__isset.columns_name = true; + schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name); + } + schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident); + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::ROUTINES_INFO); + request.__set_schema_table_params(schema_table_request_params); + TFetchSchemaTableDataResult result; + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout)); + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch routines from FE failed, errmsg=" << status; + return status; + } + std::vector<TRow> result_data = result.data_batch; + _routines_block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _routines_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + _routines_block->reserve(_block_rows_limit); + if (result_data.size() > 0) { + int col_size = result_data[0].column_value.size(); + if (col_size != _s_tbls_columns.size()) { + return Status::InternalError<false>("routine table schema is not match for FE and BE"); + } + } + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + auto insert_datetime_value = [&](int col_index, const std::vector<void*>& datas, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + auto data = datas[0]; + reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( + reinterpret_cast<char*>(data), 0); + nullable_column->get_null_map_data().emplace_back(0); + }; + + for (int i = 0; i < result_data.size(); i++) { + TRow row = result_data[i]; + + for (int j = 0; j < _s_tbls_columns.size(); j++) { + if (_s_tbls_columns[j].type == TYPE_DATETIME) { + std::vector<void*> datas(1); + VecDateTimeValue src[1]; + src[0].from_date_str(row.column_value[j].stringVal.data(), + row.column_value[j].stringVal.size()); + datas[0] = src; + insert_datetime_value(j, datas, _routines_block.get()); + } else { + insert_string_value(j, row.column_value[j].stringVal, _routines_block.get()); + } + } + } + return Status::OK(); +} + +Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_routines_block == nullptr) { + RETURN_IF_ERROR(get_block_from_fe()); + _total_rows = _routines_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h b/be/src/exec/schema_scanner/schema_routine_scanner.h new file mode 100644 index 00000000000..543f9e8e8f6 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_scanner.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaRoutinesScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaRoutinesScanner); + +public: + SchemaRoutinesScanner(); + ~SchemaRoutinesScanner() override = default; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; + +private: + Status get_block_from_fe(); + + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr<vectorized::Block> _routines_block = nullptr; + int _rpc_timeout = 3000; +}; +}; // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java index 87cbd7b58a3..dc91d344f63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -34,7 +35,20 @@ import java.util.Map; public class PlsqlManager implements Writable { private static final Logger LOG = LogManager.getLogger(PlsqlManager.class); - + public static final ImmutableList<String> ROUTINE_INFO_TITLE_NAMES = new ImmutableList.Builder<String>() + .add("SPECIFIC_NAME").add("ROUTINE_CATALOG").add("ROUTINE_SCHEMA").add("ROUTINE_NAME") + .add("ROUTINE_TYPE") + .add("DTD_IDENTIFIER").add("ROUTINE_BODY") + .add("ROUTINE_DEFINITION").add("EXTERNAL_NAME") + .add("EXTERNAL_LANGUAGE").add("PARAMETER_STYLE") + .add("IS_DETERMINISTIC") + .add("SQL_DATA_ACCESS").add("SQL_PATH") + .add("SECURITY_TYPE").add("CREATED") + .add("LAST_ALTERED").add("SQL_MODE") + .add("ROUTINE_COMMENT") + .add("DEFINER").add("CHARACTER_SET_CLIENT") + .add("COLLATION_CONNECTION").add("DATABASE_COLLATION") + .build(); @SerializedName(value = "nameToStoredProcedures") Map<PlsqlProcedureKey, PlsqlStoredProcedure> nameToStoredProcedures = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 5269a43898c..63da3bc12e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -280,9 +280,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { private MasterImpl masterImpl; private ExecuteEnv exeEnv; - // key is txn id,value is index of plan fragment instance, it's used by multi table request plan - private ConcurrentHashMap<Long, Integer> multiTableFragmentInstanceIdIndexMap = - new ConcurrentHashMap<>(64); + // key is txn id,value is index of plan fragment instance, it's used by multi + // table request plan + private ConcurrentHashMap<Long, Integer> multiTableFragmentInstanceIdIndexMap = new ConcurrentHashMap<>(64); private static TNetworkAddress getMasterAddress() { Env env = Env.getCurrentEnv(); @@ -344,8 +344,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // check cooldownMetaId of all replicas are the same List<Replica> replicas = Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id); - // FIXME(plat1ko): We only delete remote files when tablet is under a stable state: enough replicas and - // all replicas are alive. Are these conditions really sufficient or necessary? + // FIXME(plat1ko): We only delete remote files when tablet is under a stable + // state: enough replicas and + // all replicas are alive. Are these conditions really sufficient or necessary? if (replicas.size() < replicaNum) { LOG.info("num replicas are not enough, tablet={}", info.tablet_id); return; @@ -661,7 +662,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { } PatternMatcher finalMatcher = matcher; - ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(() -> { @@ -989,7 +989,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); } ConnectContext context = new ConnectContext(null, true); - // Set current connected FE to the client address, so that we can know where this request come from. + // Set current connected FE to the client address, so that we can know where + // this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); ConnectProcessor processor = null; @@ -1327,7 +1328,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, TableType.OLAP); tables.add(table); } - // if it has multi table, use multi table and update multi table running transaction table ids + // if it has multi table, use multi table and update multi table running + // transaction table ids if (CollectionUtils.isNotEmpty(request.getTbls())) { List<Long> multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList()); Env.getCurrentGlobalTransactionMgr() @@ -1517,7 +1519,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } - // return true if commit success and publish success, return false if publish timeout + // return true if commit success and publish success, return false if publish + // timeout private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException { if (request.isSetAuthCode()) { // TODO(cmy): find a way to check @@ -1595,7 +1598,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } - // return true if commit success and publish success, return false if publish timeout + // return true if commit success and publish success, return false if publish + // timeout private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { /// Check required arg: user, passwd, db, txn_id, commit_infos if (!request.isSetUser()) { @@ -1660,7 +1664,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { // Step 4: get timeout long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; - // Step 5: commit and publish return Env.getCurrentGlobalTransactionMgr() .commitAndPublishTransaction(db, tableList, @@ -1911,7 +1914,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { } /** - * For first-class multi-table scenarios, we should store the mapping between Txn and data source type in a common + * For first-class multi-table scenarios, we should store the mapping between + * Txn and data source type in a common * place. Since there is only Kafka now, we should do this first. */ private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo, long txnId) { @@ -1982,7 +1986,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() .getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId()); routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, - "failed to get stream load plan, " + exception.getMessage()), false); + "failed to get stream load plan, " + exception.getMessage()), false); } catch (UserException e) { LOG.warn("catch update routine load job error.", e); } @@ -2039,9 +2043,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { coord.setQueryType(TQueryType.LOAD); TableIf table = httpStreamParams.getTable(); if (table instanceof OlapTable) { - boolean isEnableMemtableOnSinkNode = - ((OlapTable) table).getTableProperty().getUseSchemaLightChange() - ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false; + boolean isEnableMemtableOnSinkNode = ((OlapTable) table).getTableProperty().getUseSchemaLightChange() + ? coord.getQueryOptions().isEnableMemtableOnSinkNode() + : false; coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); } httpStreamParams.setParams(coord.getStreamLoadPlan()); @@ -2123,7 +2127,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { try { if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null - && !request.getGroupCommitMode().equals("off_mode"))) { + && !request.getGroupCommitMode().equals("off_mode"))) { throw new UserException( "table light_schema_change is false, can't do stream load with group commit mode"); } @@ -2783,7 +2787,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("prev_commit_seq is not set"); } - // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), @@ -2876,7 +2879,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type + // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, + // snapshot_type if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -2957,7 +2961,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { // restoreSnapshotImpl private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp) throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info + // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, + // meta, info if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -3224,7 +3229,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("prev_commit_seq is not set"); } - // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), @@ -3506,8 +3510,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { List<String> allReqPartNames; // all request partitions try { taskLock.lock(); - // we dont lock the table. other thread in this txn will be controled by taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // we dont lock the table. other thread in this txn will be controled by + // taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new + // partition directly. // if not by this txn, just let it fail naturally is ok. List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. @@ -3517,7 +3523,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced .mapToObj(partitionIds::get) .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by others). + // from here we ONLY deal the pending partitions. not include the dealed(by + // others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); @@ -3528,7 +3535,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { overwriteManager.registerTaskInGroup(taskGroupId, taskId); InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and record them. + // now temp partitions are bumped up and use new names. we get their ids and + // record them. List<Long> newPartitionIds = new ArrayList<Long>(); for (String newPartName : pendingPartitionNames) { newPartitionIds.add(olapTable.getPartition(newPartName).getId()); @@ -3550,8 +3558,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { taskLock.unlock(); } - // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. - // so they won't be changed again. if other transaction changing it. just let it fail. + // build partition & tablets. now all partitions in allReqPartNames are replaced + // an recorded. + // so they won't be changed again. if other transaction changing it. just let it + // fail. List<TOlapTablePartition> partitions = Lists.newArrayList(); List<TTabletLocation> tablets = Lists.newArrayList(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); @@ -3647,7 +3657,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp) throws Exception { - // Step 1: check fields + // Step 1: check fields if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -3720,7 +3730,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { } } - @Override public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) { TGetColumnInfoResult result = new TGetColumnInfoResult(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index da58b6e94ab..ac4a8558887 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -19,6 +19,7 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.SchemaTable; @@ -38,6 +39,9 @@ import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.plsql.metastore.PlsqlManager; +import org.apache.doris.plsql.metastore.PlsqlProcedureKey; +import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; @@ -92,6 +96,8 @@ public class MetadataGenerator { private static final ImmutableMap<String, Integer> WORKLOAD_GROUPS_COLUMN_TO_INDEX; + private static final ImmutableMap<String, Integer> ROUTINE_INFO_COLUMN_TO_INDEX; + static { ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new ImmutableMap.Builder(); List<Column> activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema(); @@ -105,6 +111,12 @@ public class MetadataGenerator { workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(), i); } WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build(); + + ImmutableMap.Builder<String, Integer> routineInfoBuilder = new ImmutableMap.Builder(); + for (int i = 0; i < PlsqlManager.ROUTINE_INFO_TITLE_NAMES.size(); i++) { + routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(), i); + } + ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build(); } public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { @@ -176,6 +188,10 @@ public class MetadataGenerator { result = workloadGroupsMetadataResult(schemaTableParams); columnIndex = WORKLOAD_GROUPS_COLUMN_TO_INDEX; break; + case ROUTINES_INFO: + result = routineInfoMetadataResult(schemaTableParams); + columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX; + break; default: return errorResult("invalid schema table name."); } @@ -198,7 +214,7 @@ public class MetadataGenerator { return errorResult("Iceberg metadata params is not set."); } - TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); + TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); TIcebergQueryType icebergQueryType = icebergMetadataParams.getIcebergQueryType(); IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(); List<TRow> dataBatch = Lists.newArrayList(); @@ -385,7 +401,7 @@ public class MetadataGenerator { private static TFetchSchemaTableDataResult catalogsMetadataResult(TMetadataTableRequestParams params) { TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - List<CatalogIf> info = Env.getCurrentEnv().getCatalogMgr().listCatalogs(); + List<CatalogIf> info = Env.getCurrentEnv().getCatalogMgr().listCatalogs(); List<TRow> dataBatch = Lists.newArrayList(); for (CatalogIf catalog : info) { @@ -426,15 +442,15 @@ public class MetadataGenerator { List<TRow> dataBatch = Lists.newArrayList(); for (List<String> rGroupsInfo : workloadGroupsInfo) { TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // scan thread num // max remote scan thread num trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); @@ -465,11 +481,11 @@ public class MetadataGenerator { List<TRow> dataBatch = Lists.newArrayList(); for (List<String> policyRow : workloadPolicyList) { TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action - trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action + trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version dataBatch.add(trow); @@ -562,7 +578,7 @@ public class MetadataGenerator { List<TFetchSchemaTableDataResult> relayResults = forwardToOtherFrontends(replayFetchSchemaTableReq); relayResults .forEach(rs -> rs.getDataBatch() - .forEach(row -> dataBatch.add(row))); + .forEach(row -> dataBatch.add(row))); } result.setDataBatch(dataBatch); @@ -791,4 +807,58 @@ public class MetadataGenerator { result.setStatus(new TStatus(TStatusCode.OK)); return result; } + + private static TFetchSchemaTableDataResult routineInfoMetadataResult(TSchemaTableRequestParams params) { + if (!params.isSetCurrentUserIdent()) { + return errorResult("current user ident is not set."); + } + + PlsqlManager plSqlClient = Env.getCurrentEnv().getPlsqlManager(); + + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List<TRow> dataBatch = Lists.newArrayList(); + + Map<PlsqlProcedureKey, PlsqlStoredProcedure> allProc = plSqlClient.getAllPlsqlStoredProcedures(); + for (Map.Entry<PlsqlProcedureKey, PlsqlStoredProcedure> entry : allProc.entrySet()) { + PlsqlStoredProcedure proc = entry.getValue(); + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // SPECIFIC_NAME + trow.addToColumnValue(new TCell().setStringVal(Long.toString(proc.getCatalogId()))); // ROUTINE_CATALOG + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(proc.getCatalogId()); + if (catalog != null) { + DatabaseIf db = catalog.getDbNullable(proc.getDbId()); + if (db != null) { + trow.addToColumnValue(new TCell().setStringVal(db.getFullName())); // ROUTINE_SCHEMA + } else { + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA + } + } else { + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA + } + trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // ROUTINE_NAME + trow.addToColumnValue(new TCell().setStringVal("PROCEDURE")); // ROUTINE_TYPE + trow.addToColumnValue(new TCell().setStringVal("")); // DTD_IDENTIFIER + trow.addToColumnValue(new TCell().setStringVal(proc.getSource())); // ROUTINE_BODY + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_DEFINITION + trow.addToColumnValue(new TCell().setStringVal("NULL")); // EXTERNAL_NAME + trow.addToColumnValue(new TCell().setStringVal("")); // EXTERNAL_LANGUAGE + trow.addToColumnValue(new TCell().setStringVal("SQL")); // PARAMETER_STYLE + trow.addToColumnValue(new TCell().setStringVal("")); // IS_DETERMINISTIC + trow.addToColumnValue(new TCell().setStringVal("")); // SQL_DATA_ACCESS + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SQL_PATH + trow.addToColumnValue(new TCell().setStringVal("DEFINER")); // SECURITY_TYPE + trow.addToColumnValue(new TCell().setStringVal(proc.getCreateTime())); // CREATED + trow.addToColumnValue(new TCell().setStringVal(proc.getModifyTime())); // LAST_ALTERED + trow.addToColumnValue(new TCell().setStringVal("")); // SQ_MODE + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_COMMENT + trow.addToColumnValue(new TCell().setStringVal(proc.getOwnerName())); // DEFINER + trow.addToColumnValue(new TCell().setStringVal("")); // CHARACTER_SET_CLIENT + trow.addToColumnValue(new TCell().setStringVal("")); // COLLATION_CONNECTION + trow.addToColumnValue(new TCell().setStringVal("")); // DATABASE_COLLATION + dataBatch.add(trow); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1bd12c364bb..9f1c1371a0e 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -907,6 +907,7 @@ enum TSchemaTableName { METADATA_TABLE = 1, // tvf ACTIVE_QUERIES = 2, // db information_schema's table WORKLOAD_GROUPS = 3, // db information_schema's table + ROUTINES_INFO = 4, // db information_schema's table } struct TMetadataTableRequestParams { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org