This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 856c43b7b40 [feature](tvf) support query table value function (#34516)
856c43b7b40 is described below

commit 856c43b7b40129a0cc02d514e28146ca75c2164a
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Fri May 10 10:13:30 2024 +0800

    [feature](tvf) support query table value function (#34516)
    
    This PR supports a Table Value Function called `Query`. He can push a query 
directly to the catalog source for execution by specifying `catalog` and 
`query` without parsing by Doris. Doris only receives the results returned by 
the query.
    Currently only JDBC Catalog is supported.
    
    Example:
    
    ```
    Doris > desc function query('catalog' = 'mysql','query' = 'select count(*) 
as cnt from test.test');
    +-------+--------+------+------+---------+-------+
    | Field | Type   | Null | Key  | Default | Extra |
    +-------+--------+------+------+---------+-------+
    | cnt   | BIGINT | Yes  | true | NULL    | NONE  |
    +-------+--------+------+------+---------+-------+
    
    Doris > select * from query('catalog' = 'mysql','query' = 'select count(*) 
as cnt from test.test');
    +----------+
    | cnt      |
    +----------+
    | 30000000 |
    +----------+
    ```
---
 .../doris/catalog/BuiltinTableValuedFunctions.java |  4 +-
 .../doris/datasource/jdbc/JdbcExternalCatalog.java | 45 ++++++++---
 .../doris/datasource/jdbc/JdbcExternalTable.java   | 30 ++-----
 .../doris/datasource/jdbc/client/JdbcClient.java   | 54 +++++++++++++
 .../doris/datasource/jdbc/source/JdbcScanNode.java | 38 ++++++---
 .../trees/expressions/functions/table/Query.java   | 56 +++++++++++++
 .../visitor/TableValuedFunctionVisitor.java        |  5 ++
 .../tablefunction/JdbcQueryTableValueFunction.java | 58 ++++++++++++++
 .../tablefunction/QueryTableValueFunction.java     | 91 ++++++++++++++++++++++
 .../doris/tablefunction/TableValuedFunctionIf.java |  2 +
 .../external_table_p0/jdbc/test_jdbc_query_tvf.out | 44 +++++++++++
 .../jdbc/test_jdbc_query_tvf.groovy                | 49 ++++++++++++
 12 files changed, 432 insertions(+), 44 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index 9986ce71885..3becd2e102b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
 import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.Query;
 import org.apache.doris.nereids.trees.expressions.functions.table.S3;
 import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
 
@@ -55,7 +56,8 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(S3.class, "s3"),
             tableValued(MvInfos.class, "mv_infos"),
             tableValued(Jobs.class, "jobs"),
-            tableValued(Tasks.class, "tasks")
+            tableValued(Tasks.class, "tasks"),
+            tableValued(Query.class, "query")
     );
 
     public static final BuiltinTableValuedFunctions INSTANCE = new 
BuiltinTableValuedFunctions();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index fd0d966dd54..1a8cde4d03b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.jdbc;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.catalog.JdbcTable;
@@ -288,6 +289,34 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         jdbcClient.executeStmt(stmt);
     }
 
+    /**
+     * Get columns from query
+     *
+     * @param query, the query string
+     * @return the columns
+     */
+    public List<Column> getColumnsFromQuery(String query) {
+        makeSureInitialized();
+        return jdbcClient.getColumnsFromQuery(query);
+    }
+
+    public void configureJdbcTable(JdbcTable jdbcTable, String tableName) {
+        jdbcTable.setCatalogId(this.getId());
+        jdbcTable.setExternalTableName(tableName);
+        jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
+        jdbcTable.setJdbcUrl(this.getJdbcUrl());
+        jdbcTable.setJdbcUser(this.getJdbcUser());
+        jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
+        jdbcTable.setDriverClass(this.getDriverClass());
+        jdbcTable.setDriverUrl(this.getDriverUrl());
+        jdbcTable.setResourceName(this.getResource());
+        jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
+        jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
+        
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
+        
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
+        jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
+    }
+
     private void testJdbcConnection(boolean isReplay) throws DdlException {
         if (FeConstants.runningUnitTest) {
             // skip test connection in unit test
@@ -352,19 +381,11 @@ public class JdbcExternalCatalog extends ExternalCatalog {
     private JdbcTable getTestConnectionJdbcTable() throws DdlException {
         JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", 
Lists.newArrayList(),
                 TableType.JDBC_EXTERNAL_TABLE);
-        jdbcTable.setCatalogId(this.getId());
-        jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
-        jdbcTable.setJdbcUrl(this.getJdbcUrl());
-        jdbcTable.setJdbcUser(this.getJdbcUser());
-        jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
-        jdbcTable.setDriverClass(this.getDriverClass());
-        jdbcTable.setDriverUrl(this.getDriverUrl());
+        this.configureJdbcTable(jdbcTable, "test_jdbc_connection");
+
+        // Special checksum computation
         
jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
-        jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
-        jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
-        
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
-        
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
-        jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
+
         return jdbcTable;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 242b973b87e..b31fc5c24a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -33,7 +33,7 @@ import java.util.List;
 import java.util.Optional;
 
 /**
- * Elasticsearch external table.
+ * Jdbc external table.
  */
 public class JdbcExternalTable extends ExternalTable {
     private static final Logger LOG = 
LogManager.getLogger(JdbcExternalTable.class);
@@ -83,27 +83,13 @@ public class JdbcExternalTable extends ExternalTable {
         JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
         String fullDbName = this.dbName + "." + this.name;
         JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, 
TableType.JDBC_EXTERNAL_TABLE);
-        jdbcTable.setCatalogId(jdbcCatalog.getId());
-        jdbcTable.setExternalTableName(fullDbName);
-        jdbcTable.setRemoteDatabaseName(
-                ((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteDatabaseName(this.dbName));
-        jdbcTable.setRemoteTableName(
-                ((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name));
-        jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteColumnNames(this.dbName,
-                this.name));
-        jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
-        jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());
-        jdbcTable.setJdbcUser(jdbcCatalog.getJdbcUser());
-        jdbcTable.setJdbcPasswd(jdbcCatalog.getJdbcPasswd());
-        jdbcTable.setDriverClass(jdbcCatalog.getDriverClass());
-        jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
-        jdbcTable.setResourceName(jdbcCatalog.getResource());
-        jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
-        
jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize());
-        
jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize());
-        
jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime());
-        
jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime());
-        
jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive());
+        jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
+
+        // Set remote properties
+        
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
+        
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName,
 this.name));
+        
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName,
 this.name));
+
         return jdbcTable;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index b83f0263e1e..5566d651d94 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -39,7 +39,9 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
@@ -214,6 +216,58 @@ public abstract class JdbcClient {
         }
     }
 
+    /**
+     * Execute query via jdbc
+     *
+     * @param query, the query string
+     * @return List<Column>
+     */
+    public List<Column> getColumnsFromQuery(String query) {
+        Connection conn = getConnection();
+        List<Column> columns = Lists.newArrayList();
+        try {
+            PreparedStatement pstmt = conn.prepareStatement(query);
+            ResultSetMetaData metaData = pstmt.getMetaData();
+            if (metaData == null) {
+                throw new JdbcClientException("Query not supported: Failed to 
get ResultSetMetaData from query: %s",
+                        query);
+            } else {
+                List<JdbcFieldSchema> schemas = 
getSchemaFromResultSetMetaData(metaData);
+                for (JdbcFieldSchema schema : schemas) {
+                    columns.add(new Column(schema.getColumnName(), 
jdbcTypeToDoris(schema), true, null, true, null,
+                            true, -1));
+                }
+            }
+        } catch (SQLException e) {
+            throw new JdbcClientException("Failed to get columns from query: 
%s", e, query);
+        } finally {
+            close(conn);
+        }
+        return columns;
+    }
+
+
+    /**
+     * Get schema from ResultSetMetaData
+     *
+     * @param metaData, the ResultSetMetaData
+     * @return List<JdbcFieldSchema>
+     */
+    public List<JdbcFieldSchema> 
getSchemaFromResultSetMetaData(ResultSetMetaData metaData) throws SQLException {
+        List<JdbcFieldSchema> schemas = Lists.newArrayList();
+        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+            JdbcFieldSchema field = new JdbcFieldSchema();
+            field.setColumnName(metaData.getColumnName(i));
+            field.setDataType(metaData.getColumnType(i));
+            field.setDataTypeName(metaData.getColumnTypeName(i));
+            field.setColumnSize(metaData.getColumnDisplaySize(i));
+            field.setDecimalDigits(metaData.getScale(i));
+            field.setNumPrecRadix(metaData.getPrecision(i));
+            schemas.add(field);
+        }
+        return schemas;
+    }
+
     // This part used to process meta-information of database, table and 
column.
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index f7fccf6d717..58ab0f9d226 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -69,6 +69,8 @@ public class JdbcScanNode extends ExternalScanNode {
     private String tableName;
     private TOdbcTableType jdbcType;
     private String graphQueryString = "";
+    private boolean isTableValuedFunction = false;
+    private String query = "";
 
     private JdbcTable tbl;
 
@@ -84,6 +86,15 @@ public class JdbcScanNode extends ExternalScanNode {
         tableName = tbl.getProperRemoteFullTableName(jdbcType);
     }
 
+    public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
isTableValuedFunction, String query) {
+        super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+        this.isTableValuedFunction = isTableValuedFunction;
+        this.query = query;
+        tbl = (JdbcTable) desc.getTable();
+        jdbcType = tbl.getJdbcTableType();
+        tableName = tbl.getExternalTableName();
+    }
+
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
@@ -232,14 +243,19 @@ public class JdbcScanNode extends ExternalScanNode {
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         StringBuilder output = new StringBuilder();
-        output.append(prefix).append("TABLE: ").append(tableName).append("\n");
-        if (detailLevel == TExplainLevel.BRIEF) {
-            return output.toString();
-        }
-        output.append(prefix).append("QUERY: 
").append(getJdbcQueryStr()).append("\n");
-        if (!conjuncts.isEmpty()) {
-            Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
-            output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
+        if (isTableValuedFunction) {
+            output.append(prefix).append("TABLE VALUE FUNCTION\n");
+            output.append(prefix).append("QUERY: ").append(query).append("\n");
+        } else {
+            output.append(prefix).append("TABLE: 
").append(tableName).append("\n");
+            if (detailLevel == TExplainLevel.BRIEF) {
+                return output.toString();
+            }
+            output.append(prefix).append("QUERY: 
").append(getJdbcQueryStr()).append("\n");
+            if (!conjuncts.isEmpty()) {
+                Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
+                output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
+            }
         }
         return output.toString();
     }
@@ -286,7 +302,11 @@ public class JdbcScanNode extends ExternalScanNode {
         msg.jdbc_scan_node = new TJdbcScanNode();
         msg.jdbc_scan_node.setTupleId(desc.getId().asInt());
         msg.jdbc_scan_node.setTableName(tableName);
-        msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
+        if (isTableValuedFunction) {
+            msg.jdbc_scan_node.setQueryString(query);
+        } else {
+            msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
+        }
         msg.jdbc_scan_node.setTableType(jdbcType);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
new file mode 100644
index 00000000000..4c379c7e46d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.QueryTableValueFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** query */
+public class Query extends TableValuedFunction {
+    public Query(Properties properties) {
+        super("query", properties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return 
QueryTableValueFunction.createQueryTableValueFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build 
QueryTableValuedFunction by "
+                + this + ": " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitQuery(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index d0c76d143a2..bd09a81b011 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
 import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.Query;
 import org.apache.doris.nereids.trees.expressions.functions.table.S3;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
 import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
@@ -92,4 +93,8 @@ public interface TableValuedFunctionVisitor<R, C> {
     default R visitS3(S3 s3, C context) {
         return visitTableValuedFunction(s3, context);
     }
+
+    default R visitQuery(Query query, C context) {
+        return visitTableValuedFunction(query, context);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
new file mode 100644
index 00000000000..b884dab3882
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.JdbcTable;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class JdbcQueryTableValueFunction extends QueryTableValueFunction {
+    public static final Logger LOG = 
LogManager.getLogger(JdbcQueryTableValueFunction.class);
+
+    public JdbcQueryTableValueFunction(Map<String, String> params) throws 
AnalysisException {
+        super(params);
+    }
+
+    @Override
+    public List<Column> getTableColumns() throws AnalysisException {
+        JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
+        return catalog.getColumnsFromQuery(query);
+    }
+
+    @Override
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+        JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
+        JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), 
desc.getTable().getFullSchema(),
+                TableType.JDBC);
+        catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
+        desc.setTable(jdbcTable);
+        return new JdbcScanNode(id, desc, true, query);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
new file mode 100644
index 00000000000..3865d0b25c6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class QueryTableValueFunction extends TableValuedFunctionIf {
+    public static final Logger LOG = 
LogManager.getLogger(QueryTableValueFunction.class);
+    public static final String NAME = "query";
+    private static final String CATALOG = "catalog";
+    private static final String QUERY = "query";
+    protected CatalogIf catalogIf;
+    protected final String query;
+
+    public QueryTableValueFunction(Map<String, String> params) throws 
AnalysisException {
+        if (params.size() != 2) {
+            throw new AnalysisException("Query TableValueFunction must have 2 
arguments: 'catalog' and 'query'");
+        }
+        if (!params.containsKey(CATALOG) || !params.containsKey(QUERY)) {
+            throw new AnalysisException("Query TableValueFunction must have 2 
arguments: 'catalog' and 'query'");
+        }
+        String catalogName = params.get(CATALOG);
+        this.query = params.get(QUERY);
+        this.catalogIf = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+    }
+
+    public static QueryTableValueFunction 
createQueryTableValueFunction(Map<String, String> params)
+            throws AnalysisException {
+        String catalogName = params.get(CATALOG);
+
+        // check priv
+        UserIdentity userIdentity = 
ConnectContext.get().getCurrentUserIdentity();
+        if (!Env.getCurrentEnv().getAuth().checkCtlPriv(userIdentity, 
catalogName, PrivPredicate.SELECT)) {
+            throw new org.apache.doris.nereids.exceptions.AnalysisException(
+                    "user " + userIdentity + " has no privilege to query in 
catalog " + catalogName);
+        }
+
+        CatalogIf catalogIf = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+        if (catalogIf == null) {
+            throw new AnalysisException("Catalog not found: " + catalogName);
+        }
+        if (catalogIf instanceof JdbcExternalCatalog) {
+            return new JdbcQueryTableValueFunction(params);
+        } else {
+            throw new AnalysisException(
+                    "Catalog not supported query tvf: " + catalogName + ", 
catalog type:" + catalogIf.getType());
+        }
+    }
+
+    @Override
+    public String getTableName() {
+        return "QueryTableValueFunction";
+    }
+
+    @Override
+    public abstract List<Column> getTableColumns() throws AnalysisException;
+
+    @Override
+    public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 41ed6e14cb2..c99da94de0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -72,6 +72,8 @@ public abstract class TableValuedFunctionIf {
                 return new TasksTableValuedFunction(params);
             case GroupCommitTableValuedFunction.NAME:
                 return new GroupCommitTableValuedFunction(params);
+            case QueryTableValueFunction.NAME:
+                return 
QueryTableValueFunction.createQueryTableValueFunction(params);
             default:
                 throw new AnalysisException("Could not find table function " + 
funcName);
         }
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out 
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out
new file mode 100644
index 00000000000..27110566d7a
--- /dev/null
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+bigint BIGINT  Yes     true    \N      NONE
+bigint_u       LARGEINT        Yes     true    \N      NONE
+binary TEXT    Yes     true    \N      NONE
+bit    TEXT    Yes     true    \N      NONE
+blob   TEXT    Yes     true    \N      NONE
+boolean        TINYINT Yes     true    \N      NONE
+char   CHAR(6) Yes     true    \N      NONE
+date   DATE    Yes     true    \N      NONE
+datetime       DATETIME        Yes     true    \N      NONE
+decimal        DECIMAL(12, 4)  Yes     true    \N      NONE
+decimal_u      DECIMAL(19, 5)  Yes     true    \N      NONE
+double DOUBLE  Yes     true    \N      NONE
+double_u       DOUBLE  Yes     true    \N      NONE
+enum   CHAR(6) Yes     true    \N      NONE
+float  FLOAT   Yes     true    \N      NONE
+float_u        FLOAT   Yes     true    \N      NONE
+int    INT     Yes     true    \N      NONE
+int_u  BIGINT  Yes     true    \N      NONE
+json   TEXT    Yes     true    \N      NONE
+mediumint      INT     Yes     true    \N      NONE
+mediumint_u    INT     Yes     true    \N      NONE
+set    CHAR(6) Yes     true    \N      NONE
+smallint       SMALLINT        Yes     true    \N      NONE
+smallint_u     INT     Yes     true    \N      NONE
+text   TEXT    Yes     true    \N      NONE
+time   TEXT    Yes     true    \N      NONE
+timestamp      DATETIME(4)     Yes     true    \N      NONE
+tinyint        TINYINT Yes     true    \N      NONE
+tinyint_u      SMALLINT        Yes     true    \N      NONE
+varbinary      TEXT    Yes     true    \N      NONE
+varchar        VARCHAR(10)     Yes     true    \N      NONE
+year   SMALLINT        Yes     true    \N      NONE
+
+-- !sql --
+\N     302     \N      502     602     4.14159 \N      6.14159 \N      -124    
-302    2013    -402    -502    -602    \N      2012-10-26T02:08:39.345700      
2013-10-26T08:09:18     -5.14145        \N      -7.1400 row2    \N      
09:11:09.567    text2   0xE86F6C6C6F20576F726C67        \N      \N      0x2F    
\N      0x88656C6C9F    Value3
+201    301     401     501     601     3.14159 4.1415926       5.14159 1       
-123    -301    2012    -401    -501    -601    2012-10-30      
2012-10-25T12:05:36.345700      2012-10-25T08:08:08     -4.14145        
-5.1400000001   -6.1400 row1    line1   09:09:09.567    text1   
0x48656C6C6F20576F726C64        {"age": 30, "city": "London", "name": "Alice"}  
Option1,Option3 0x2A    0x48656C6C6F00000000000000      0x48656C6C6F    Value2
+202    302     402     502     602     4.14159 5.1415926       6.14159 0       
-124    -302    2013    -402    -502    -602    2012-11-01      
2012-10-26T02:08:39.345700      2013-10-26T08:09:18     -5.14145        
-6.1400000001   -7.1400 row2    line2   09:11:09.567    text2   
0xE86F6C6C6F20576F726C67        {"age": 18, "city": "ChongQing", "name": 
"Gaoxin"}      Option1,Option2 0x2F    0x58676C6C6F00000000000000      
0x88656C6C9F    Value3
+203    303     403     503     603     7.14159 8.1415926       9.14159 0       
\N      -402    2017    -602    -902    -1102   2012-11-02      \N      
2013-10-27T08:11:18     -5.14145        -6.1400000000001        -7.1400 row3    
line3   09:11:09.567    text3   0xE86F6C6C6F20576F726C67        {"age": 24, 
"city": "ChongQing", "name": "ChenQi"}      Option2 0x2F    
0x58676C6C6F00000000000000      \N      Value1
+
+-- !sql --
+4
+
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy
new file mode 100644
index 00000000000..dcf36554be1
--- /dev/null
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_jdbc_query_tvf") {
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String s3_endpoint = getS3Endpoint()
+    String bucket = getS3BucketName()
+    String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar";
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String user = "test_jdbc_user";
+        String pwd = '123456';
+        String catalog_name = "mysql_jdbc_catalog";
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+
+        sql """drop catalog if exists ${catalog_name} """
+
+        sql """create catalog if not exists ${catalog_name} properties(
+            "type"="jdbc",
+            "user"="root",
+            "password"="123456",
+            "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test",
+            "driver_url" = "${driver_url}",
+            "driver_class" = "com.mysql.cj.jdbc.Driver"
+        );"""
+
+        order_qt_sql """desc function query('catalog' = '${catalog_name}', 
'query' = 'select * from doris_test.all_types') """
+        order_qt_sql """select * from query('catalog' = '${catalog_name}', 
'query' = 'select * from doris_test.all_types') """
+        order_qt_sql """select * from query('catalog' = '${catalog_name}', 
'query' = 'select count(*) as cnt from doris_test.all_types') """
+
+//         sql """drop catalog if exists ${catalog_name} """
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to