This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 856c43b7b40 [feature](tvf) support query table value function (#34516) 856c43b7b40 is described below commit 856c43b7b40129a0cc02d514e28146ca75c2164a Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Fri May 10 10:13:30 2024 +0800 [feature](tvf) support query table value function (#34516) This PR supports a Table Value Function called `Query`. He can push a query directly to the catalog source for execution by specifying `catalog` and `query` without parsing by Doris. Doris only receives the results returned by the query. Currently only JDBC Catalog is supported. Example: ``` Doris > desc function query('catalog' = 'mysql','query' = 'select count(*) as cnt from test.test'); +-------+--------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+--------+------+------+---------+-------+ | cnt | BIGINT | Yes | true | NULL | NONE | +-------+--------+------+------+---------+-------+ Doris > select * from query('catalog' = 'mysql','query' = 'select count(*) as cnt from test.test'); +----------+ | cnt | +----------+ | 30000000 | +----------+ ``` --- .../doris/catalog/BuiltinTableValuedFunctions.java | 4 +- .../doris/datasource/jdbc/JdbcExternalCatalog.java | 45 ++++++++--- .../doris/datasource/jdbc/JdbcExternalTable.java | 30 ++----- .../doris/datasource/jdbc/client/JdbcClient.java | 54 +++++++++++++ .../doris/datasource/jdbc/source/JdbcScanNode.java | 38 ++++++--- .../trees/expressions/functions/table/Query.java | 56 +++++++++++++ .../visitor/TableValuedFunctionVisitor.java | 5 ++ .../tablefunction/JdbcQueryTableValueFunction.java | 58 ++++++++++++++ .../tablefunction/QueryTableValueFunction.java | 91 ++++++++++++++++++++++ .../doris/tablefunction/TableValuedFunctionIf.java | 2 + .../external_table_p0/jdbc/test_jdbc_query_tvf.out | 44 +++++++++++ .../jdbc/test_jdbc_query_tvf.groovy | 49 ++++++++++++ 12 files changed, 432 insertions(+), 44 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index 9986ce71885..3becd2e102b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; @@ -55,7 +56,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(S3.class, "s3"), tableValued(MvInfos.class, "mv_infos"), tableValued(Jobs.class, "jobs"), - tableValued(Tasks.class, "tasks") + tableValued(Tasks.class, "tasks"), + tableValued(Query.class, "query") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index fd0d966dd54..1a8cde4d03b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.jdbc; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.JdbcTable; @@ -288,6 +289,34 @@ public class JdbcExternalCatalog extends ExternalCatalog { jdbcClient.executeStmt(stmt); } + /** + * Get columns from query + * + * @param query, the query string + * @return the columns + */ + public List<Column> getColumnsFromQuery(String query) { + makeSureInitialized(); + return jdbcClient.getColumnsFromQuery(query); + } + + public void configureJdbcTable(JdbcTable jdbcTable, String tableName) { + jdbcTable.setCatalogId(this.getId()); + jdbcTable.setExternalTableName(tableName); + jdbcTable.setJdbcTypeName(this.getDatabaseTypeName()); + jdbcTable.setJdbcUrl(this.getJdbcUrl()); + jdbcTable.setJdbcUser(this.getJdbcUser()); + jdbcTable.setJdbcPasswd(this.getJdbcPasswd()); + jdbcTable.setDriverClass(this.getDriverClass()); + jdbcTable.setDriverUrl(this.getDriverUrl()); + jdbcTable.setResourceName(this.getResource()); + jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize()); + jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize()); + jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime()); + jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime()); + jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive()); + } + private void testJdbcConnection(boolean isReplay) throws DdlException { if (FeConstants.runningUnitTest) { // skip test connection in unit test @@ -352,19 +381,11 @@ public class JdbcExternalCatalog extends ExternalCatalog { private JdbcTable getTestConnectionJdbcTable() throws DdlException { JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(), TableType.JDBC_EXTERNAL_TABLE); - jdbcTable.setCatalogId(this.getId()); - jdbcTable.setJdbcTypeName(this.getDatabaseTypeName()); - jdbcTable.setJdbcUrl(this.getJdbcUrl()); - jdbcTable.setJdbcUser(this.getJdbcUser()); - jdbcTable.setJdbcPasswd(this.getJdbcPasswd()); - jdbcTable.setDriverClass(this.getDriverClass()); - jdbcTable.setDriverUrl(this.getDriverUrl()); + this.configureJdbcTable(jdbcTable, "test_jdbc_connection"); + + // Special checksum computation jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl())); - jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize()); - jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize()); - jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime()); - jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime()); - jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive()); + return jdbcTable; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 242b973b87e..b31fc5c24a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Optional; /** - * Elasticsearch external table. + * Jdbc external table. */ public class JdbcExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class); @@ -83,27 +83,13 @@ public class JdbcExternalTable extends ExternalTable { JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog; String fullDbName = this.dbName + "." + this.name; JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE); - jdbcTable.setCatalogId(jdbcCatalog.getId()); - jdbcTable.setExternalTableName(fullDbName); - jdbcTable.setRemoteDatabaseName( - ((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteDatabaseName(this.dbName)); - jdbcTable.setRemoteTableName( - ((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name)); - jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteColumnNames(this.dbName, - this.name)); - jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName()); - jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl()); - jdbcTable.setJdbcUser(jdbcCatalog.getJdbcUser()); - jdbcTable.setJdbcPasswd(jdbcCatalog.getJdbcPasswd()); - jdbcTable.setDriverClass(jdbcCatalog.getDriverClass()); - jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl()); - jdbcTable.setResourceName(jdbcCatalog.getResource()); - jdbcTable.setCheckSum(jdbcCatalog.getCheckSum()); - jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize()); - jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize()); - jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime()); - jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime()); - jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive()); + jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName); + + // Set remote properties + jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName)); + jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name)); + jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name)); + return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index b83f0263e1e..5566d651d94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -39,7 +39,9 @@ import java.net.URL; import java.net.URLClassLoader; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.Collections; @@ -214,6 +216,58 @@ public abstract class JdbcClient { } } + /** + * Execute query via jdbc + * + * @param query, the query string + * @return List<Column> + */ + public List<Column> getColumnsFromQuery(String query) { + Connection conn = getConnection(); + List<Column> columns = Lists.newArrayList(); + try { + PreparedStatement pstmt = conn.prepareStatement(query); + ResultSetMetaData metaData = pstmt.getMetaData(); + if (metaData == null) { + throw new JdbcClientException("Query not supported: Failed to get ResultSetMetaData from query: %s", + query); + } else { + List<JdbcFieldSchema> schemas = getSchemaFromResultSetMetaData(metaData); + for (JdbcFieldSchema schema : schemas) { + columns.add(new Column(schema.getColumnName(), jdbcTypeToDoris(schema), true, null, true, null, + true, -1)); + } + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to get columns from query: %s", e, query); + } finally { + close(conn); + } + return columns; + } + + + /** + * Get schema from ResultSetMetaData + * + * @param metaData, the ResultSetMetaData + * @return List<JdbcFieldSchema> + */ + public List<JdbcFieldSchema> getSchemaFromResultSetMetaData(ResultSetMetaData metaData) throws SQLException { + List<JdbcFieldSchema> schemas = Lists.newArrayList(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + JdbcFieldSchema field = new JdbcFieldSchema(); + field.setColumnName(metaData.getColumnName(i)); + field.setDataType(metaData.getColumnType(i)); + field.setDataTypeName(metaData.getColumnTypeName(i)); + field.setColumnSize(metaData.getColumnDisplaySize(i)); + field.setDecimalDigits(metaData.getScale(i)); + field.setNumPrecRadix(metaData.getPrecision(i)); + schemas.add(field); + } + return schemas; + } + // This part used to process meta-information of database, table and column. /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java index f7fccf6d717..58ab0f9d226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java @@ -69,6 +69,8 @@ public class JdbcScanNode extends ExternalScanNode { private String tableName; private TOdbcTableType jdbcType; private String graphQueryString = ""; + private boolean isTableValuedFunction = false; + private String query = ""; private JdbcTable tbl; @@ -84,6 +86,15 @@ public class JdbcScanNode extends ExternalScanNode { tableName = tbl.getProperRemoteFullTableName(jdbcType); } + public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean isTableValuedFunction, String query) { + super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false); + this.isTableValuedFunction = isTableValuedFunction; + this.query = query; + tbl = (JdbcTable) desc.getTable(); + jdbcType = tbl.getJdbcTableType(); + tableName = tbl.getExternalTableName(); + } + @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); @@ -232,14 +243,19 @@ public class JdbcScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); - output.append(prefix).append("TABLE: ").append(tableName).append("\n"); - if (detailLevel == TExplainLevel.BRIEF) { - return output.toString(); - } - output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n"); - if (!conjuncts.isEmpty()) { - Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); - output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); + if (isTableValuedFunction) { + output.append(prefix).append("TABLE VALUE FUNCTION\n"); + output.append(prefix).append("QUERY: ").append(query).append("\n"); + } else { + output.append(prefix).append("TABLE: ").append(tableName).append("\n"); + if (detailLevel == TExplainLevel.BRIEF) { + return output.toString(); + } + output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n"); + if (!conjuncts.isEmpty()) { + Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); + output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); + } } return output.toString(); } @@ -286,7 +302,11 @@ public class JdbcScanNode extends ExternalScanNode { msg.jdbc_scan_node = new TJdbcScanNode(); msg.jdbc_scan_node.setTupleId(desc.getId().asInt()); msg.jdbc_scan_node.setTableName(tableName); - msg.jdbc_scan_node.setQueryString(getJdbcQueryStr()); + if (isTableValuedFunction) { + msg.jdbc_scan_node.setQueryString(query); + } else { + msg.jdbc_scan_node.setQueryString(getJdbcQueryStr()); + } msg.jdbc_scan_node.setTableType(jdbcType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java new file mode 100644 index 00000000000..4c379c7e46d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Query.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.QueryTableValueFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** query */ +public class Query extends TableValuedFunction { + public Query(Properties properties) { + super("query", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + return QueryTableValueFunction.createQueryTableValueFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build QueryTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { + return visitor.visitQuery(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index d0c76d143a2..bd09a81b011 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; @@ -92,4 +93,8 @@ public interface TableValuedFunctionVisitor<R, C> { default R visitS3(S3 s3, C context) { return visitTableValuedFunction(s3, context); } + + default R visitQuery(Query query, C context) { + return visitTableValuedFunction(query, context); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java new file mode 100644 index 00000000000..b884dab3882 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.datasource.jdbc.source.JdbcScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public class JdbcQueryTableValueFunction extends QueryTableValueFunction { + public static final Logger LOG = LogManager.getLogger(JdbcQueryTableValueFunction.class); + + public JdbcQueryTableValueFunction(Map<String, String> params) throws AnalysisException { + super(params); + } + + @Override + public List<Column> getTableColumns() throws AnalysisException { + JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf; + return catalog.getColumnsFromQuery(query); + } + + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf; + JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), desc.getTable().getFullSchema(), + TableType.JDBC); + catalog.configureJdbcTable(jdbcTable, desc.getTable().getName()); + desc.setTable(jdbcTable); + return new JdbcScanNode(id, desc, true, query); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java new file mode 100644 index 00000000000..3865d0b25c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public abstract class QueryTableValueFunction extends TableValuedFunctionIf { + public static final Logger LOG = LogManager.getLogger(QueryTableValueFunction.class); + public static final String NAME = "query"; + private static final String CATALOG = "catalog"; + private static final String QUERY = "query"; + protected CatalogIf catalogIf; + protected final String query; + + public QueryTableValueFunction(Map<String, String> params) throws AnalysisException { + if (params.size() != 2) { + throw new AnalysisException("Query TableValueFunction must have 2 arguments: 'catalog' and 'query'"); + } + if (!params.containsKey(CATALOG) || !params.containsKey(QUERY)) { + throw new AnalysisException("Query TableValueFunction must have 2 arguments: 'catalog' and 'query'"); + } + String catalogName = params.get(CATALOG); + this.query = params.get(QUERY); + this.catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + } + + public static QueryTableValueFunction createQueryTableValueFunction(Map<String, String> params) + throws AnalysisException { + String catalogName = params.get(CATALOG); + + // check priv + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!Env.getCurrentEnv().getAuth().checkCtlPriv(userIdentity, catalogName, PrivPredicate.SELECT)) { + throw new org.apache.doris.nereids.exceptions.AnalysisException( + "user " + userIdentity + " has no privilege to query in catalog " + catalogName); + } + + CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalogIf == null) { + throw new AnalysisException("Catalog not found: " + catalogName); + } + if (catalogIf instanceof JdbcExternalCatalog) { + return new JdbcQueryTableValueFunction(params); + } else { + throw new AnalysisException( + "Catalog not supported query tvf: " + catalogName + ", catalog type:" + catalogIf.getType()); + } + } + + @Override + public String getTableName() { + return "QueryTableValueFunction"; + } + + @Override + public abstract List<Column> getTableColumns() throws AnalysisException; + + @Override + public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 41ed6e14cb2..c99da94de0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -72,6 +72,8 @@ public abstract class TableValuedFunctionIf { return new TasksTableValuedFunction(params); case GroupCommitTableValuedFunction.NAME: return new GroupCommitTableValuedFunction(params); + case QueryTableValueFunction.NAME: + return QueryTableValueFunction.createQueryTableValueFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out new file mode 100644 index 00000000000..27110566d7a --- /dev/null +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_tvf.out @@ -0,0 +1,44 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +bigint BIGINT Yes true \N NONE +bigint_u LARGEINT Yes true \N NONE +binary TEXT Yes true \N NONE +bit TEXT Yes true \N NONE +blob TEXT Yes true \N NONE +boolean TINYINT Yes true \N NONE +char CHAR(6) Yes true \N NONE +date DATE Yes true \N NONE +datetime DATETIME Yes true \N NONE +decimal DECIMAL(12, 4) Yes true \N NONE +decimal_u DECIMAL(19, 5) Yes true \N NONE +double DOUBLE Yes true \N NONE +double_u DOUBLE Yes true \N NONE +enum CHAR(6) Yes true \N NONE +float FLOAT Yes true \N NONE +float_u FLOAT Yes true \N NONE +int INT Yes true \N NONE +int_u BIGINT Yes true \N NONE +json TEXT Yes true \N NONE +mediumint INT Yes true \N NONE +mediumint_u INT Yes true \N NONE +set CHAR(6) Yes true \N NONE +smallint SMALLINT Yes true \N NONE +smallint_u INT Yes true \N NONE +text TEXT Yes true \N NONE +time TEXT Yes true \N NONE +timestamp DATETIME(4) Yes true \N NONE +tinyint TINYINT Yes true \N NONE +tinyint_u SMALLINT Yes true \N NONE +varbinary TEXT Yes true \N NONE +varchar VARCHAR(10) Yes true \N NONE +year SMALLINT Yes true \N NONE + +-- !sql -- +\N 302 \N 502 602 4.14159 \N 6.14159 \N -124 -302 2013 -402 -502 -602 \N 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 \N -7.1400 row2 \N 09:11:09.567 text2 0xE86F6C6C6F20576F726C67 \N \N 0x2F \N 0x88656C6C9F Value3 +201 301 401 501 601 3.14159 4.1415926 5.14159 1 -123 -301 2012 -401 -501 -601 2012-10-30 2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145 -5.1400000001 -6.1400 row1 line1 09:09:09.567 text1 0x48656C6C6F20576F726C64 {"age": 30, "city": "London", "name": "Alice"} Option1,Option3 0x2A 0x48656C6C6F00000000000000 0x48656C6C6F Value2 +202 302 402 502 602 4.14159 5.1415926 6.14159 0 -124 -302 2013 -402 -502 -602 2012-11-01 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 -6.1400000001 -7.1400 row2 line2 09:11:09.567 text2 0xE86F6C6C6F20576F726C67 {"age": 18, "city": "ChongQing", "name": "Gaoxin"} Option1,Option2 0x2F 0x58676C6C6F00000000000000 0x88656C6C9F Value3 +203 303 403 503 603 7.14159 8.1415926 9.14159 0 \N -402 2017 -602 -902 -1102 2012-11-02 \N 2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3 line3 09:11:09.567 text3 0xE86F6C6C6F20576F726C67 {"age": 24, "city": "ChongQing", "name": "ChenQi"} Option2 0x2F 0x58676C6C6F00000000000000 \N Value1 + +-- !sql -- +4 + diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy new file mode 100644 index 00000000000..dcf36554be1 --- /dev/null +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_tvf.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_jdbc_query_tvf") { + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String user = "test_jdbc_user"; + String pwd = '123456'; + String catalog_name = "mysql_jdbc_catalog"; + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + + sql """drop catalog if exists ${catalog_name} """ + + sql """create catalog if not exists ${catalog_name} properties( + "type"="jdbc", + "user"="root", + "password"="123456", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver" + );""" + + order_qt_sql """desc function query('catalog' = '${catalog_name}', 'query' = 'select * from doris_test.all_types') """ + order_qt_sql """select * from query('catalog' = '${catalog_name}', 'query' = 'select * from doris_test.all_types') """ + order_qt_sql """select * from query('catalog' = '${catalog_name}', 'query' = 'select count(*) as cnt from doris_test.all_types') """ + +// sql """drop catalog if exists ${catalog_name} """ + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org