This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e57f31769d9602ad5c4779b7d78a0576593d6900
Author: morningman <morning...@163.com>
AuthorDate: Thu Apr 13 01:02:09 2023 +0800

    [zhongjin] support proxy mode to query jdbc table directly
---
 .../org/apache/doris/external/jdbc/JdbcClient.java | 34 +++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  3 ++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 35 ++++++++++++++++++++++
 3 files changed, 72 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
index 419ce7590a..1cf1597d8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.qe.CommonResultSet;
+import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.google.common.collect.Lists;
@@ -41,8 +43,10 @@ import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.List;
 import java.util.Map;
 
@@ -832,4 +836,34 @@ public class JdbcClient {
         }
         return dorisTableSchema;
     }
+
+    public CommonResultSet execute(String sql) throws Exception {
+        try (Connection conn = getConnection()) {
+            try (Statement stmt = conn.createStatement()) {
+                try (ResultSet resultSet = stmt.executeQuery(sql)) {
+                    // metadata
+                    List<Column> metaColumns = Lists.newArrayList();
+                    ResultSetMetaData metaData = resultSet.getMetaData();
+                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                        String columnName = metaData.getColumnName(i);
+                        int columnType = metaData.getColumnType(i);
+                        metaColumns.add(new Column(columnName, Type.STRING));
+                    }
+
+                    // data
+                    List<List<String>> rows = Lists.newArrayList();
+                    while (resultSet.next()) {
+                        List<String> row = Lists.newArrayList();
+                        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                            row.add(resultSet.getObject(i).toString());
+                        }
+                        rows.add(row);
+                    }
+                    return new CommonResultSet(new 
CommonResultSetMetaData(metaColumns), rows);
+                }
+            }
+        } catch (Exception e) {
+            throw new Exception("execute sql failed. sql: " + sql, e);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c8719a4b5a..cc94b91205 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -796,6 +796,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
     public long fileSplitSize = 0;
 
+    @VariableMgr.VarAttr(name = "proxy_mode", needForward = true)
+    public String proxyMode = "";
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 20a684836d..1a616b18d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -54,6 +54,7 @@ import org.apache.doris.analysis.StmtRewriter;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.SwitchStmt;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TransactionBeginStmt;
 import org.apache.doris.analysis.TransactionCommitStmt;
 import org.apache.doris.analysis.TransactionRollbackStmt;
@@ -65,12 +66,14 @@ import org.apache.doris.analysis.UseStmt;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.JdbcTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.JdbcExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuditLog;
 import org.apache.doris.common.Config;
@@ -92,6 +95,8 @@ import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.JdbcExternalCatalog;
+import org.apache.doris.external.jdbc.JdbcClient;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.LoadJobRowResult;
 import org.apache.doris.load.loadv2.LoadManager;
@@ -1300,9 +1305,38 @@ public class StmtExecutor implements ProfileWriter {
             }
         }
 
+        if (context.getSessionVariable().proxyMode.equalsIgnoreCase("jdbc")) {
+            try {
+                runProxyMode();
+                return;
+            } catch (Exception e) {
+                LOG.warn("run proxy mode failed", e);
+            }
+        }
+
         sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
     }
 
+    private void runProxyMode() throws Exception {
+        if (!(parsedStmt instanceof SelectStmt)) {
+            throw new Exception("not select stmt");
+        }
+        SelectStmt stmt = (SelectStmt) parsedStmt;
+        List<TableRef> tableRefs = stmt.getTableRefs();
+        if (tableRefs == null || tableRefs.isEmpty()) {
+            throw new Exception("no table");
+        }
+        TableIf tableIf = stmt.getTableRefs().get(0).getTable();
+        if (!(tableIf instanceof JdbcExternalTable)) {
+            throw new Exception("not jdbc table");
+        }
+        JdbcExternalTable jdbcTable = (JdbcExternalTable) tableIf;
+        JdbcExternalCatalog externalCatalog = (JdbcExternalCatalog) 
jdbcTable.getCatalog();
+        JdbcClient jdbcClient = externalCatalog.getJdbcClient();
+        ResultSet resultSet = jdbcClient.execute(originStmt.originStmt);
+        sendResultSet(resultSet);
+    }
+
     private void sendResult(boolean isOutfileQuery, boolean isSendFields, 
Queriable queryStmt, MysqlChannel channel,
             CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult 
cacheResult) throws Exception {
         // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 
into outfile xxx,
@@ -2259,3 +2293,4 @@ public class StmtExecutor implements ProfileWriter {
 }
 
 
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to