This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch doris-for-zhongjin in repository https://gitbox.apache.org/repos/asf/doris.git
commit e57f31769d9602ad5c4779b7d78a0576593d6900 Author: morningman <morning...@163.com> AuthorDate: Thu Apr 13 01:02:09 2023 +0800 [zhongjin] support proxy mode to query jdbc table directly --- .../org/apache/doris/external/jdbc/JdbcClient.java | 34 +++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 3 ++ .../java/org/apache/doris/qe/StmtExecutor.java | 35 ++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java index 419ce7590a..1cf1597d8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java @@ -26,6 +26,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.Util; +import org.apache.doris.qe.CommonResultSet; +import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import com.alibaba.druid.pool.DruidDataSource; import com.google.common.collect.Lists; @@ -41,8 +43,10 @@ import java.net.URLClassLoader; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.List; import java.util.Map; @@ -832,4 +836,34 @@ public class JdbcClient { } return dorisTableSchema; } + + public CommonResultSet execute(String sql) throws Exception { + try (Connection conn = getConnection()) { + try (Statement stmt = conn.createStatement()) { + try (ResultSet resultSet = stmt.executeQuery(sql)) { + // metadata + List<Column> metaColumns = Lists.newArrayList(); + ResultSetMetaData metaData = resultSet.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String columnName = metaData.getColumnName(i); + int columnType = metaData.getColumnType(i); + metaColumns.add(new Column(columnName, Type.STRING)); + } + + // data + List<List<String>> rows = Lists.newArrayList(); + while (resultSet.next()) { + List<String> row = Lists.newArrayList(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row.add(resultSet.getObject(i).toString()); + } + rows.add(row); + } + return new CommonResultSet(new CommonResultSetMetaData(metaColumns), rows); + } + } + } catch (Exception e) { + throw new Exception("execute sql failed. sql: " + sql, e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c8719a4b5a..cc94b91205 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -796,6 +796,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + @VariableMgr.VarAttr(name = "proxy_mode", needForward = true) + public String proxyMode = ""; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 20a684836d..1a616b18d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -54,6 +54,7 @@ import org.apache.doris.analysis.StmtRewriter; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.SwitchStmt; import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TransactionBeginStmt; import org.apache.doris.analysis.TransactionCommitStmt; import org.apache.doris.analysis.TransactionRollbackStmt; @@ -65,12 +66,14 @@ import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; @@ -92,6 +95,8 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.JdbcExternalCatalog; +import org.apache.doris.external.jdbc.JdbcClient; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.load.loadv2.LoadManager; @@ -1300,9 +1305,38 @@ public class StmtExecutor implements ProfileWriter { } } + if (context.getSessionVariable().proxyMode.equalsIgnoreCase("jdbc")) { + try { + runProxyMode(); + return; + } catch (Exception e) { + LOG.warn("run proxy mode failed", e); + } + } + sendResult(isOutfileQuery, false, queryStmt, channel, null, null); } + private void runProxyMode() throws Exception { + if (!(parsedStmt instanceof SelectStmt)) { + throw new Exception("not select stmt"); + } + SelectStmt stmt = (SelectStmt) parsedStmt; + List<TableRef> tableRefs = stmt.getTableRefs(); + if (tableRefs == null || tableRefs.isEmpty()) { + throw new Exception("no table"); + } + TableIf tableIf = stmt.getTableRefs().get(0).getTable(); + if (!(tableIf instanceof JdbcExternalTable)) { + throw new Exception("not jdbc table"); + } + JdbcExternalTable jdbcTable = (JdbcExternalTable) tableIf; + JdbcExternalCatalog externalCatalog = (JdbcExternalCatalog) jdbcTable.getCatalog(); + JdbcClient jdbcClient = externalCatalog.getJdbcClient(); + ResultSet resultSet = jdbcClient.execute(originStmt.originStmt); + sendResultSet(resultSet); + } + private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel, CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception { // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, @@ -2259,3 +2293,4 @@ public class StmtExecutor implements ProfileWriter { } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org