This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 12ee9223afc [enhancement](plsql) Support select * from routines 
(#32866)
12ee9223afc is described below

commit 12ee9223afcbcc5d8c17c2a0a89ddbcfb89a424c
Author: Vallish Pai <vallish...@gmail.com>
AuthorDate: Thu Mar 28 13:52:50 2024 +0530

    [enhancement](plsql) Support select * from routines (#32866)
    
    Support show of plsql procedure using select * from routines.
---
 be/src/exec/schema_scanner.cpp                     |   1 +
 .../exec/schema_scanner/schema_routine_scanner.cpp | 172 +++++++++++++++++++++
 .../exec/schema_scanner/schema_routine_scanner.h   |  52 +++++++
 .../apache/doris/plsql/metastore/PlsqlManager.java |  16 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  67 ++++----
 .../doris/tablefunction/MetadataGenerator.java     |  96 ++++++++++--
 gensrc/thrift/FrontendService.thrift               |   1 +
 7 files changed, 362 insertions(+), 43 deletions(-)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 87995a3f0ef..6c1aac7d0d1 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -37,6 +37,7 @@
 #include "exec/schema_scanner/schema_partitions_scanner.h"
 #include "exec/schema_scanner/schema_processlist_scanner.h"
 #include "exec/schema_scanner/schema_profiling_scanner.h"
+#include "exec/schema_scanner/schema_routine_scanner.h"
 #include "exec/schema_scanner/schema_rowsets_scanner.h"
 #include "exec/schema_scanner/schema_schema_privileges_scanner.h"
 #include "exec/schema_scanner/schema_schemata_scanner.h"
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
new file mode 100644
index 00000000000..7db46ada650
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_routine_scanner.h"
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns 
= {
+        {"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"DTD_IDENTIFIER", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_BODY", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_DEFINITION", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"EXTERNAL_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"EXTERNAL_LANGUAGE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"PARAMETER_STYLE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"IS_DETERMINISTIC", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"SQL_DATA_ACCESS", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"SQL_PATH", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"SECURITY_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"CREATED", TYPE_DATETIME, sizeof(int64_t), true},
+        {"LAST_ALTERED", TYPE_DATETIME, sizeof(int64_t), true},
+        {"SQL_MODE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"ROUTINE_COMMENT", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"DEFINER", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"CHARACTER_SET_CLIENT", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"COLLATION_CONNECTION", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"DATABASE_COLLATION", TYPE_VARCHAR, sizeof(StringRef), true},
+};
+
+SchemaRoutinesScanner::SchemaRoutinesScanner()
+        : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PROCEDURES) {}
+
+Status SchemaRoutinesScanner::start(RuntimeState* state) {
+    _block_rows_limit = state->batch_size();
+    _rpc_timeout = state->execution_timeout() * 1000;
+    return Status::OK();
+}
+
+Status SchemaRoutinesScanner::get_block_from_fe() {
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    TSchemaTableRequestParams schema_table_request_params;
+    for (int i = 0; i < _s_tbls_columns.size(); i++) {
+        schema_table_request_params.__isset.columns_name = true;
+        
schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
+    }
+    
schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
+    TFetchSchemaTableDataRequest request;
+    request.__set_schema_table_name(TSchemaTableName::ROUTINES_INFO);
+    request.__set_schema_table_params(schema_table_request_params);
+    TFetchSchemaTableDataResult result;
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->fetchSchemaTableData(result, request);
+            },
+            _rpc_timeout));
+    Status status(Status::create(result.status));
+    if (!status.ok()) {
+        LOG(WARNING) << "fetch routines from FE failed, errmsg=" << status;
+        return status;
+    }
+    std::vector<TRow> result_data = result.data_batch;
+    _routines_block = vectorized::Block::create_unique();
+    for (int i = 0; i < _s_tbls_columns.size(); ++i) {
+        TypeDescriptor descriptor(_s_tbls_columns[i].type);
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+        _routines_block->insert(vectorized::ColumnWithTypeAndName(
+                data_type->create_column(), data_type, 
_s_tbls_columns[i].name));
+    }
+    _routines_block->reserve(_block_rows_limit);
+    if (result_data.size() > 0) {
+        int col_size = result_data[0].column_value.size();
+        if (col_size != _s_tbls_columns.size()) {
+            return Status::InternalError<false>("routine table schema is not 
match for FE and BE");
+        }
+    }
+    auto insert_string_value = [&](int col_index, std::string str_val, 
vectorized::Block* block) {
+        vectorized::MutableColumnPtr mutable_col_ptr;
+        mutable_col_ptr = 
std::move(*block->get_by_position(col_index).column).assume_mutable();
+        auto* nullable_column =
+                
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+        vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+        
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
+                                                                          
str_val.size());
+        nullable_column->get_null_map_data().emplace_back(0);
+    };
+    auto insert_datetime_value = [&](int col_index, const std::vector<void*>& 
datas,
+                                     vectorized::Block* block) {
+        vectorized::MutableColumnPtr mutable_col_ptr;
+        mutable_col_ptr = 
std::move(*block->get_by_position(col_index).column).assume_mutable();
+        auto* nullable_column =
+                
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+        vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+        auto data = datas[0];
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+                reinterpret_cast<char*>(data), 0);
+        nullable_column->get_null_map_data().emplace_back(0);
+    };
+
+    for (int i = 0; i < result_data.size(); i++) {
+        TRow row = result_data[i];
+
+        for (int j = 0; j < _s_tbls_columns.size(); j++) {
+            if (_s_tbls_columns[j].type == TYPE_DATETIME) {
+                std::vector<void*> datas(1);
+                VecDateTimeValue src[1];
+                src[0].from_date_str(row.column_value[j].stringVal.data(),
+                                     row.column_value[j].stringVal.size());
+                datas[0] = src;
+                insert_datetime_value(j, datas, _routines_block.get());
+            } else {
+                insert_string_value(j, row.column_value[j].stringVal, 
_routines_block.get());
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+    if (!_is_init) {
+        return Status::InternalError("Used before initialized.");
+    }
+
+    if (nullptr == block || nullptr == eos) {
+        return Status::InternalError("input pointer is nullptr.");
+    }
+
+    if (_routines_block == nullptr) {
+        RETURN_IF_ERROR(get_block_from_fe());
+        _total_rows = _routines_block->rows();
+    }
+
+    if (_row_idx == _total_rows) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
+    vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
+    mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
+    _row_idx += current_batch_rows;
+
+    *eos = _row_idx == _total_rows;
+    return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h 
b/be/src/exec/schema_scanner/schema_routine_scanner.h
new file mode 100644
index 00000000000..543f9e8e8f6
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaRoutinesScanner : public SchemaScanner {
+    ENABLE_FACTORY_CREATOR(SchemaRoutinesScanner);
+
+public:
+    SchemaRoutinesScanner();
+    ~SchemaRoutinesScanner() override = default;
+
+    Status start(RuntimeState* state) override;
+    Status get_next_block(vectorized::Block* block, bool* eos) override;
+
+    static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+
+private:
+    Status get_block_from_fe();
+
+    int _block_rows_limit = 4096;
+    int _row_idx = 0;
+    int _total_rows = 0;
+    std::unique_ptr<vectorized::Block> _routines_block = nullptr;
+    int _rpc_timeout = 3000;
+};
+}; // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
index 87cbd7b58a3..dc91d344f63 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
@@ -34,7 +35,20 @@ import java.util.Map;
 
 public class PlsqlManager implements Writable {
     private static final Logger LOG = LogManager.getLogger(PlsqlManager.class);
-
+    public static final ImmutableList<String> ROUTINE_INFO_TITLE_NAMES = new 
ImmutableList.Builder<String>()
+            
.add("SPECIFIC_NAME").add("ROUTINE_CATALOG").add("ROUTINE_SCHEMA").add("ROUTINE_NAME")
+            .add("ROUTINE_TYPE")
+            .add("DTD_IDENTIFIER").add("ROUTINE_BODY")
+            .add("ROUTINE_DEFINITION").add("EXTERNAL_NAME")
+            .add("EXTERNAL_LANGUAGE").add("PARAMETER_STYLE")
+            .add("IS_DETERMINISTIC")
+            .add("SQL_DATA_ACCESS").add("SQL_PATH")
+            .add("SECURITY_TYPE").add("CREATED")
+            .add("LAST_ALTERED").add("SQL_MODE")
+            .add("ROUTINE_COMMENT")
+            .add("DEFINER").add("CHARACTER_SET_CLIENT")
+            .add("COLLATION_CONNECTION").add("DATABASE_COLLATION")
+            .build();
     @SerializedName(value = "nameToStoredProcedures")
     Map<PlsqlProcedureKey, PlsqlStoredProcedure> nameToStoredProcedures = 
Maps.newConcurrentMap();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 5269a43898c..63da3bc12e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -280,9 +280,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private MasterImpl masterImpl;
     private ExecuteEnv exeEnv;
-    // key is txn id,value is index of plan fragment instance, it's used by 
multi table request plan
-    private ConcurrentHashMap<Long, Integer> 
multiTableFragmentInstanceIdIndexMap =
-            new ConcurrentHashMap<>(64);
+    // key is txn id,value is index of plan fragment instance, it's used by 
multi
+    // table request plan
+    private ConcurrentHashMap<Long, Integer> 
multiTableFragmentInstanceIdIndexMap = new ConcurrentHashMap<>(64);
 
     private static TNetworkAddress getMasterAddress() {
         Env env = Env.getCurrentEnv();
@@ -344,8 +344,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
             // check cooldownMetaId of all replicas are the same
             List<Replica> replicas = 
Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
-            // FIXME(plat1ko): We only delete remote files when tablet is 
under a stable state: enough replicas and
-            //  all replicas are alive. Are these conditions really sufficient 
or necessary?
+            // FIXME(plat1ko): We only delete remote files when tablet is 
under a stable
+            // state: enough replicas and
+            // all replicas are alive. Are these conditions really sufficient 
or necessary?
             if (replicas.size() < replicaNum) {
                 LOG.info("num replicas are not enough, tablet={}", 
info.tablet_id);
                 return;
@@ -661,7 +662,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
         PatternMatcher finalMatcher = matcher;
 
-
         ExecutorService executor = Executors.newSingleThreadExecutor();
         Future<?> future = executor.submit(() -> {
 
@@ -989,7 +989,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             LOG.debug("receive forwarded stmt {} from FE: {}", 
params.getStmtId(), params.getClientNodeHost());
         }
         ConnectContext context = new ConnectContext(null, true);
-        // Set current connected FE to the client address, so that we can know 
where this request come from.
+        // Set current connected FE to the client address, so that we can know 
where
+        // this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
 
         ConnectProcessor processor = null;
@@ -1327,7 +1328,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, 
TableType.OLAP);
             tables.add(table);
         }
-        // if it has multi table, use multi table and update multi table 
running transaction table ids
+        // if it has multi table, use multi table and update multi table 
running
+        // transaction table ids
         if (CollectionUtils.isNotEmpty(request.getTbls())) {
             List<Long> multiTableIds = 
tables.stream().map(Table::getId).collect(Collectors.toList());
             Env.getCurrentGlobalTransactionMgr()
@@ -1517,7 +1519,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
-    // return true if commit success and publish success, return false if 
publish timeout
+    // return true if commit success and publish success, return false if 
publish
+    // timeout
     private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws 
UserException {
         if (request.isSetAuthCode()) {
             // TODO(cmy): find a way to check
@@ -1595,7 +1598,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
-    // return true if commit success and publish success, return false if 
publish timeout
+    // return true if commit success and publish success, return false if 
publish
+    // timeout
     private boolean commitTxnImpl(TCommitTxnRequest request) throws 
UserException {
         /// Check required arg: user, passwd, db, txn_id, commit_infos
         if (!request.isSetUser()) {
@@ -1660,7 +1664,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // Step 4: get timeout
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() / 2 : 5000;
 
-
         // Step 5: commit and publish
         return Env.getCurrentGlobalTransactionMgr()
                 .commitAndPublishTransaction(db, tableList,
@@ -1911,7 +1914,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     }
 
     /**
-     * For first-class multi-table scenarios, we should store the mapping 
between Txn and data source type in a common
+     * For first-class multi-table scenarios, we should store the mapping 
between
+     * Txn and data source type in a common
      * place. Since there is only Kafka now, we should do this first.
      */
     private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo, 
long txnId) {
@@ -1982,7 +1986,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 RoutineLoadJob routineLoadJob = 
Env.getCurrentEnv().getRoutineLoadManager()
                         
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
                 routineLoadJob.updateState(JobState.PAUSED, new 
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR,
-                            "failed to get stream load plan, " + 
exception.getMessage()), false);
+                        "failed to get stream load plan, " + 
exception.getMessage()), false);
             } catch (UserException e) {
                 LOG.warn("catch update routine load job error.", e);
             }
@@ -2039,9 +2043,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             coord.setQueryType(TQueryType.LOAD);
             TableIf table = httpStreamParams.getTable();
             if (table instanceof OlapTable) {
-                boolean isEnableMemtableOnSinkNode =
-                        ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
-                            ? 
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+                boolean isEnableMemtableOnSinkNode = ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
+                        ? coord.getQueryOptions().isEnableMemtableOnSinkNode()
+                        : false;
                 
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
             }
             httpStreamParams.setParams(coord.getStreamLoadPlan());
@@ -2123,7 +2127,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         try {
             if (!((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
                     && (request.getGroupCommitMode() != null
-                    && !request.getGroupCommitMode().equals("off_mode"))) {
+                            && 
!request.getGroupCommitMode().equals("off_mode"))) {
                 throw new UserException(
                         "table light_schema_change is false, can't do stream 
load with group commit mode");
             }
@@ -2783,7 +2787,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("prev_commit_seq is not set");
         }
 
-
         // step 1: check auth
         if (Strings.isNullOrEmpty(request.getToken())) {
             checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
@@ -2876,7 +2879,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     // getSnapshotImpl
     private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, 
String clientIp)
             throws UserException {
-        // Step 1: Check all required arg: user, passwd, db, label_name, 
snapshot_name, snapshot_type
+        // Step 1: Check all required arg: user, passwd, db, label_name, 
snapshot_name,
+        // snapshot_type
         if (!request.isSetUser()) {
             throw new UserException("user is not set");
         }
@@ -2957,7 +2961,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     // restoreSnapshotImpl
     private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest 
request, String clientIp)
             throws UserException {
-        // Step 1: Check all required arg: user, passwd, db, label_name, 
repo_name, meta, info
+        // Step 1: Check all required arg: user, passwd, db, label_name, 
repo_name,
+        // meta, info
         if (!request.isSetUser()) {
             throw new UserException("user is not set");
         }
@@ -3224,7 +3229,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new UserException("prev_commit_seq is not set");
         }
 
-
         // step 1: check auth
         if (Strings.isNullOrEmpty(request.getToken())) {
             checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
@@ -3506,8 +3510,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<String> allReqPartNames; // all request partitions
         try {
             taskLock.lock();
-            // we dont lock the table. other thread in this txn will be 
controled by taskLock.
-            // if we have already replaced. dont do it again, but acquire the 
recorded new partition directly.
+            // we dont lock the table. other thread in this txn will be 
controled by
+            // taskLock.
+            // if we have already replaced. dont do it again, but acquire the 
recorded new
+            // partition directly.
             // if not by this txn, just let it fail naturally is ok.
             List<Long> replacedPartIds = 
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
             // here if replacedPartIds still have null. this will throw 
exception.
@@ -3517,7 +3523,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     .filter(i -> partitionIds.get(i) == 
replacedPartIds.get(i)) // equal means not replaced
                     .mapToObj(partitionIds::get)
                     .collect(Collectors.toList());
-            // from here we ONLY deal the pending partitions. not include the 
dealed(by others).
+            // from here we ONLY deal the pending partitions. not include the 
dealed(by
+            // others).
             if (!pendingPartitionIds.isEmpty()) {
                 // below two must have same order inner.
                 List<String> pendingPartitionNames = 
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3528,7 +3535,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 overwriteManager.registerTaskInGroup(taskGroupId, taskId);
                 InsertOverwriteUtil.addTempPartitions(olapTable, 
pendingPartitionNames, tempPartitionNames);
                 InsertOverwriteUtil.replacePartition(olapTable, 
pendingPartitionNames, tempPartitionNames);
-                // now temp partitions are bumped up and use new names. we get 
their ids and record them.
+                // now temp partitions are bumped up and use new names. we get 
their ids and
+                // record them.
                 List<Long> newPartitionIds = new ArrayList<Long>();
                 for (String newPartName : pendingPartitionNames) {
                     
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
@@ -3550,8 +3558,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             taskLock.unlock();
         }
 
-        // build partition & tablets. now all partitions in allReqPartNames 
are replaced an recorded.
-        // so they won't be changed again. if other transaction changing it. 
just let it fail.
+        // build partition & tablets. now all partitions in allReqPartNames 
are replaced
+        // an recorded.
+        // so they won't be changed again. if other transaction changing it. 
just let it
+        // fail.
         List<TOlapTablePartition> partitions = Lists.newArrayList();
         List<TTabletLocation> tablets = Lists.newArrayList();
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
@@ -3647,7 +3657,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private TGetMetaResult getMetaImpl(TGetMetaRequest request, String 
clientIp)
             throws Exception {
-        //  Step 1: check fields
+        // Step 1: check fields
         if (!request.isSetUser()) {
             throw new UserException("user is not set");
         }
@@ -3720,7 +3730,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
     }
 
-
     @Override
     public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
         TGetColumnInfoResult result = new TGetColumnInfoResult();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index da58b6e94ab..ac4a8558887 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -19,6 +19,7 @@ package org.apache.doris.tablefunction;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.SchemaTable;
@@ -38,6 +39,9 @@ import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.mtmv.MTMVPartitionUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.plsql.metastore.PlsqlManager;
+import org.apache.doris.plsql.metastore.PlsqlProcedureKey;
+import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
@@ -92,6 +96,8 @@ public class MetadataGenerator {
 
     private static final ImmutableMap<String, Integer> 
WORKLOAD_GROUPS_COLUMN_TO_INDEX;
 
+    private static final ImmutableMap<String, Integer> 
ROUTINE_INFO_COLUMN_TO_INDEX;
+
     static {
         ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new 
ImmutableMap.Builder();
         List<Column> activeQueriesColList = 
SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
@@ -105,6 +111,12 @@ public class MetadataGenerator {
             
workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(),
 i);
         }
         WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build();
+
+        ImmutableMap.Builder<String, Integer> routineInfoBuilder = new 
ImmutableMap.Builder();
+        for (int i = 0; i < PlsqlManager.ROUTINE_INFO_TITLE_NAMES.size(); i++) 
{
+            
routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(),
 i);
+        }
+        ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build();
     }
 
     public static TFetchSchemaTableDataResult 
getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
@@ -176,6 +188,10 @@ public class MetadataGenerator {
                 result = workloadGroupsMetadataResult(schemaTableParams);
                 columnIndex = WORKLOAD_GROUPS_COLUMN_TO_INDEX;
                 break;
+            case ROUTINES_INFO:
+                result = routineInfoMetadataResult(schemaTableParams);
+                columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX;
+                break;
             default:
                 return errorResult("invalid schema table name.");
         }
@@ -198,7 +214,7 @@ public class MetadataGenerator {
             return errorResult("Iceberg metadata params is not set.");
         }
 
-        TIcebergMetadataParams icebergMetadataParams =  
params.getIcebergMetadataParams();
+        TIcebergMetadataParams icebergMetadataParams = 
params.getIcebergMetadataParams();
         TIcebergQueryType icebergQueryType = 
icebergMetadataParams.getIcebergQueryType();
         IcebergMetadataCache icebergMetadataCache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
         List<TRow> dataBatch = Lists.newArrayList();
@@ -385,7 +401,7 @@ public class MetadataGenerator {
 
     private static TFetchSchemaTableDataResult 
catalogsMetadataResult(TMetadataTableRequestParams params) {
         TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-        List<CatalogIf> info  = 
Env.getCurrentEnv().getCatalogMgr().listCatalogs();
+        List<CatalogIf> info = 
Env.getCurrentEnv().getCatalogMgr().listCatalogs();
         List<TRow> dataBatch = Lists.newArrayList();
 
         for (CatalogIf catalog : info) {
@@ -426,15 +442,15 @@ public class MetadataGenerator {
         List<TRow> dataBatch = Lists.newArrayList();
         for (List<String> rGroupsInfo : workloadGroupsInfo) {
             TRow trow = new TRow();
-            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0))));  // id
-            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(1)));             // name
+            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(1))); // name
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
-            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(3)));             // mem_limit
-            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(4)));             // mem overcommit
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
-            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(8)));             // cpu hard limit
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // scan thread num
             // max remote scan thread num
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
@@ -465,11 +481,11 @@ public class MetadataGenerator {
         List<TRow> dataBatch = Lists.newArrayList();
         for (List<String> policyRow : workloadPolicyList) {
             TRow trow = new TRow();
-            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(policyRow.get(0))));    // id
-            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); 
               // name
-            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); 
               // condition
-            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); 
               // action
-            trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(4))));  // priority
+            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); 
// name
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); 
// condition
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); 
// action
+            trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
             trow.addToColumnValue(new 
TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
             trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
             dataBatch.add(trow);
@@ -562,7 +578,7 @@ public class MetadataGenerator {
             List<TFetchSchemaTableDataResult> relayResults = 
forwardToOtherFrontends(replayFetchSchemaTableReq);
             relayResults
                     .forEach(rs -> rs.getDataBatch()
-                        .forEach(row -> dataBatch.add(row)));
+                            .forEach(row -> dataBatch.add(row)));
         }
 
         result.setDataBatch(dataBatch);
@@ -791,4 +807,58 @@ public class MetadataGenerator {
         result.setStatus(new TStatus(TStatusCode.OK));
         return result;
     }
+
+    private static TFetchSchemaTableDataResult 
routineInfoMetadataResult(TSchemaTableRequestParams params) {
+        if (!params.isSetCurrentUserIdent()) {
+            return errorResult("current user ident is not set.");
+        }
+
+        PlsqlManager plSqlClient = Env.getCurrentEnv().getPlsqlManager();
+
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        List<TRow> dataBatch = Lists.newArrayList();
+
+        Map<PlsqlProcedureKey, PlsqlStoredProcedure> allProc = 
plSqlClient.getAllPlsqlStoredProcedures();
+        for (Map.Entry<PlsqlProcedureKey, PlsqlStoredProcedure> entry : 
allProc.entrySet()) {
+            PlsqlStoredProcedure proc = entry.getValue();
+            TRow trow = new TRow();
+            trow.addToColumnValue(new TCell().setStringVal(proc.getName())); 
// SPECIFIC_NAME
+            trow.addToColumnValue(new 
TCell().setStringVal(Long.toString(proc.getCatalogId()))); // ROUTINE_CATALOG
+            CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(proc.getCatalogId());
+            if (catalog != null) {
+                DatabaseIf db = catalog.getDbNullable(proc.getDbId());
+                if (db != null) {
+                    trow.addToColumnValue(new 
TCell().setStringVal(db.getFullName())); // ROUTINE_SCHEMA
+                } else {
+                    trow.addToColumnValue(new TCell().setStringVal("")); // 
ROUTINE_SCHEMA
+                }
+            } else {
+                trow.addToColumnValue(new TCell().setStringVal("")); // 
ROUTINE_SCHEMA
+            }
+            trow.addToColumnValue(new TCell().setStringVal(proc.getName())); 
// ROUTINE_NAME
+            trow.addToColumnValue(new TCell().setStringVal("PROCEDURE")); // 
ROUTINE_TYPE
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
DTD_IDENTIFIER
+            trow.addToColumnValue(new TCell().setStringVal(proc.getSource())); 
// ROUTINE_BODY
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
ROUTINE_DEFINITION
+            trow.addToColumnValue(new TCell().setStringVal("NULL")); // 
EXTERNAL_NAME
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
EXTERNAL_LANGUAGE
+            trow.addToColumnValue(new TCell().setStringVal("SQL")); // 
PARAMETER_STYLE
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
IS_DETERMINISTIC
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
SQL_DATA_ACCESS
+            trow.addToColumnValue(new TCell().setStringVal("NULL")); // 
SQL_PATH
+            trow.addToColumnValue(new TCell().setStringVal("DEFINER")); // 
SECURITY_TYPE
+            trow.addToColumnValue(new 
TCell().setStringVal(proc.getCreateTime())); // CREATED
+            trow.addToColumnValue(new 
TCell().setStringVal(proc.getModifyTime())); // LAST_ALTERED
+            trow.addToColumnValue(new TCell().setStringVal("")); // SQ_MODE
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
ROUTINE_COMMENT
+            trow.addToColumnValue(new 
TCell().setStringVal(proc.getOwnerName())); // DEFINER
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
CHARACTER_SET_CLIENT
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
COLLATION_CONNECTION
+            trow.addToColumnValue(new TCell().setStringVal("")); // 
DATABASE_COLLATION
+            dataBatch.add(trow);
+        }
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1bd12c364bb..9f1c1371a0e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -907,6 +907,7 @@ enum TSchemaTableName {
   METADATA_TABLE = 1, // tvf
   ACTIVE_QUERIES = 2, // db information_schema's table
   WORKLOAD_GROUPS = 3, // db information_schema's table
+  ROUTINES_INFO = 4, // db information_schema's table
 }
 
 struct TMetadataTableRequestParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to