This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 253445ca46 [vectorzied](jdbc) fix jdbc executor for get result by 
batch and memo… (#15843)
253445ca46 is described below

commit 253445ca466cc105bf6ebfce5524cfeae894767c
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Sat Jan 21 08:22:22 2023 +0800

    [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… 
(#15843)
    
    result set should be get by batch size2.
    fix memory leak3.
---
 be/src/vec/exec/vjdbc_connector.cpp                |  6 +-
 be/src/vec/exec/vjdbc_connector.h                  |  1 +
 docs/zh-CN/docs/lakehouse/external-table/jdbc.md   |  6 +-
 docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md    |  9 ++
 fe/java-udf/pom.xml                                |  5 --
 .../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 96 ++++++++++++----------
 7 files changed, 141 insertions(+), 52 deletions(-)

diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index adc1ac4cd2..d9e6838925 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos, 
std::vector<MutableColumnPtr>& columns
         const std::string& column_name = slot_desc->col_name();
         jobject column_data =
                 env->CallObjectMethod(block_obj, _executor_get_list_id, 
materialized_column_index);
-        jint num_rows = env->CallIntMethod(column_data, 
_executor_get_list_size_id);
-
+        jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, 
_executor_clazz,
+                                                     _executor_block_rows_id);
         for (int row = 0; row < num_rows; ++row) {
             jobject cur_data = env->CallObjectMethod(column_data, 
_executor_get_list_id, row);
             RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
@@ -412,6 +412,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
                                 _executor_close_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", 
JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
                                 _executor_has_next_id));
+    RETURN_IF_ERROR(
+            register_id(_executor_clazz, "getCurBlockRows", "()I", 
_executor_block_rows_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", 
JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
                                 _executor_get_blocks_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index c1d416783c..4f05253d64 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -114,6 +114,7 @@ private:
     jmethodID _executor_write_id;
     jmethodID _executor_read_id;
     jmethodID _executor_has_next_id;
+    jmethodID _executor_block_rows_id;
     jmethodID _executor_get_blocks_id;
     jmethodID _executor_get_types_id;
     jmethodID _executor_get_arr_list_id;
diff --git a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md 
b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
index f87cfe3bf2..4c53d27243 100644
--- a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
+++ b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md
@@ -55,7 +55,7 @@ properties (
     "type"="jdbc",
     "user"="root",
     "password"="123456",
-    "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test",
+    "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test?useCursorFetch=true",
     "driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar";,
     "driver_class"="com.mysql.jdbc.Driver"
 );
@@ -81,7 +81,7 @@ PROPERTIES (
 | **type**         | "jdbc", 必填项标志资源类型  |
 | **user**         | 访问外表数据库所使的用户名 |
 | **password**     | 该用户对应的密码信息 |
-| **jdbc_url**     | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: 
"jdbc:mysql://127.0.0.1:3306/test"。|
+| **jdbc_url**     | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: 
"jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true"。|
 | **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. |
 | **driver_url**   | 
用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar。本地单机测试时,可将jar包放在本地路径下,"driver_url"="file:///home/disk1/pathTo/mysql-connector-java-5.1.47.jar",多机时需保证具有完全相同的路径信息。
 |
 | **resource**     | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。|
@@ -266,4 +266,4 @@ PROPERTIES (
 
 ## Q&A
 
-请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。
\ No newline at end of file
+请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
index 136dfd41db..7d16eeace3 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
@@ -332,3 +332,12 @@ JDBC Catalog 通过标准 JDBC 协议,连接其他数据源。
     ```
 
     可在创建 Catalog 的 `jdbc_url` 把JDBC连接串最后增加 `?useSSL=false` ,如 `"jdbc_url" = 
"jdbc:mysql://127.0.0.1:3306/test?useSSL=false"`
+
+6. 查询MYSQL的数据库报OutOfMemoryError的错误
+
+    为减少内存的使用,在获取结果集时,每次仅获取batchSize的大小,这样一批一批的获取结果。而MYSQL默认是一次将结果全部加载到内存,
+    
设置的按批获取无法生效,需要主动显示的在URL中指定:"jdbc_url"="jdbc:mysql://IP:PORT/doris_test?useCursorFetch=true"
+
+ 7. 在使用JDBC查询过程中时,如果出现"CAUSED BY: SQLException OutOfMemoryError" 类似的错误
+
+    
如果MYSQL已经主动设置useCursorFetch,可以在be.conf中修改jvm_max_heap_size的值,尝试增大JVM的内存,目前默认值为1024M。
\ No newline at end of file
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 493015d49c..b37e9c4696 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -104,11 +104,6 @@ under the License.
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.zaxxer</groupId>
-            <artifactId>HikariCP</artifactId>
-            <version>${hikaricp.version}</version>
-        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
new file mode 100644
index 0000000000..94fbde6217
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+
+public class FakeDriver implements Driver {
+    private Driver driver;
+
+    FakeDriver(Driver driver) {
+        this.driver = driver;
+    }
+
+    @Override
+    public Connection connect(String url, Properties info) throws SQLException 
{
+        return this.driver.connect(url, info);
+    }
+
+    @Override
+    public boolean acceptsURL(String url) throws SQLException {
+        return this.driver.acceptsURL(url);
+    }
+
+    @Override
+    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) 
throws SQLException {
+        return this.driver.getPropertyInfo(url, info);
+    }
+
+    @Override
+    public int getMajorVersion() {
+        return this.driver.getMajorVersion();
+    }
+
+    @Override
+    public int getMinorVersion() {
+        return this.driver.getMinorVersion();
+    }
+
+    @Override
+    public boolean jdbcCompliant() {
+        return this.driver.jdbcCompliant();
+    }
+
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index d90aa4055a..e76e9e78e3 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
 
 import com.google.common.base.Preconditions;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import java.io.FileNotFoundException;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
 import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.Date;
+import java.sql.Driver;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -50,10 +53,12 @@ public class JdbcExecutor {
     private Statement stmt = null;
     private ResultSet resultSet = null;
     private ResultSetMetaData resultSetMetaData = null;
-    // Use HikariDataSource to help us manage the JDBC connections.
-    private HikariDataSource dataSource = null;
     private List<String> resultColumnTypeNames = null;
     private int baseTypeInt = 0;
+    private URLClassLoader classLoader = null;
+    private List<List<Object>> block = null;
+    private int bacthSizeNum = 0;
+    private int curBlockRows = 0;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -75,22 +80,31 @@ public class JdbcExecutor {
             stmt.close();
         }
         if (conn != null) {
+            conn.clearWarnings();
             conn.close();
         }
-        if (dataSource != null) {
-            dataSource.close();
+        if (classLoader != null) {
+            classLoader.clearAssertionStatus();
+            classLoader.close();
         }
         resultSet = null;
         stmt = null;
         conn = null;
-        dataSource = null;
+        classLoader = null;
     }
 
     public int read() throws UdfRuntimeException {
         try {
             resultSet = ((PreparedStatement) stmt).executeQuery();
             resultSetMetaData = resultSet.getMetaData();
-            return resultSetMetaData.getColumnCount();
+            int columnCount = resultSetMetaData.getColumnCount();
+            resultColumnTypeNames = new ArrayList<>(columnCount);
+            block = new ArrayList<>(columnCount);
+            for (int i = 0; i < columnCount; ++i) {
+                
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
+                block.add(Arrays.asList(new Object[bacthSizeNum]));
+            }
+            return columnCount;
         } catch (SQLException e) {
             throw new UdfRuntimeException("JDBC executor sql has error: ", e);
         }
@@ -111,17 +125,8 @@ public class JdbcExecutor {
         }
     }
 
-    public List<String> getResultColumnTypeNames() throws UdfRuntimeException {
-        try {
-            int count = resultSetMetaData.getColumnCount();
-            resultColumnTypeNames = new ArrayList<>(count);
-            for (int i = 0; i < count; ++i) {
-                
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
-            }
-            return resultColumnTypeNames;
-        } catch (SQLException e) {
-            throw new UdfRuntimeException("JDBC executor 
getResultColumnTypeNames has error: ", e);
-        }
+    public List<String> getResultColumnTypeNames() {
+        return resultColumnTypeNames;
     }
 
     public List<Object> getArrayColumnData(Object object) throws 
UdfRuntimeException {
@@ -169,20 +174,15 @@ public class JdbcExecutor {
     }
 
     public List<List<Object>> getBlock(int batchSize) throws 
UdfRuntimeException {
-        List<List<Object>> block = null;
         try {
             int columnCount = resultSetMetaData.getColumnCount();
-            block = new ArrayList<>(columnCount);
-            for (int i = 0; i < columnCount; ++i) {
-                block.add(new ArrayList<>(batchSize));
-            }
-            int numRows = 0;
+            curBlockRows = 0;
             do {
                 for (int i = 0; i < columnCount; ++i) {
-                    block.get(i).add(resultSet.getObject(i + 1));
+                    block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
                 }
-                numRows++;
-            } while (numRows < batchSize && resultSet.next());
+                curBlockRows++;
+            } while (curBlockRows < batchSize && resultSet.next());
         } catch (SQLException e) {
             throw new UdfRuntimeException("get next block failed: ", e);
         } catch (Exception e) {
@@ -191,6 +191,10 @@ public class JdbcExecutor {
         return block;
     }
 
+    public int getCurBlockRows() {
+        return curBlockRows;
+    }
+
     public boolean hasNext() throws UdfRuntimeException {
         try {
             if (resultSet == null) {
@@ -242,33 +246,41 @@ public class JdbcExecutor {
     private void init(String driverUrl, String sql, int batchSize, String 
driverClass, String jdbcUrl, String jdbcUser,
             String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException 
{
         try {
-            ClassLoader parent = getClass().getClassLoader();
-            ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, 
parent);
-            Thread.currentThread().setContextClassLoader(classLoader);
-            HikariConfig config = new HikariConfig();
-            config.setDriverClassName(driverClass);
-            config.setJdbcUrl(jdbcUrl);
-            config.setUsername(jdbcUser);
-            config.setPassword(jdbcPassword);
-            config.setMaximumPoolSize(1);
+            File file = new File(driverUrl);
+            URL url = file.toURI().toURL();
+            classLoader = new URLClassLoader(new URL[] {url});
+            Driver driver = (Driver) Class.forName(driverClass, true, 
classLoader).getDeclaredConstructor()
+                    .newInstance();
+            // in jdk11 cann't call addURL function by reflect to load class. 
so use this way
+            // But DriverManager can't find the driverClass correctly, so add 
a faker driver
+            // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
+            DriverManager.registerDriver(new FakeDriver(driver));
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, 
jdbcPassword);
 
-            dataSource = new HikariDataSource(config);
-            conn = dataSource.getConnection();
             if (op == TJdbcOperation.READ) {
                 conn.setAutoCommit(false);
                 Preconditions.checkArgument(sql != null);
                 stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY,
                         ResultSet.FETCH_FORWARD);
                 stmt.setFetchSize(batchSize);
+                bacthSizeNum = batchSize;
             } else {
                 stmt = conn.createStatement();
             }
-        } catch (FileNotFoundException e) {
-            throw new UdfRuntimeException("Can not find driver file:  " + 
driverUrl, e);
+        } catch (ClassNotFoundException e) {
+            throw new UdfRuntimeException("ClassNotFoundException:  " + 
driverClass, e);
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("MalformedURLException to load class 
about " + driverUrl, e);
         } catch (SQLException e) {
             throw new UdfRuntimeException("Initialize datasource failed: ", e);
+        } catch (InstantiationException e) {
+            throw new UdfRuntimeException("InstantiationException failed: ", 
e);
+        } catch (IllegalAccessException e) {
+            throw new UdfRuntimeException("IllegalAccessException failed: ", 
e);
+        } catch (InvocationTargetException e) {
+            throw new UdfRuntimeException("InvocationTargetException new 
instance failed: ", e);
+        } catch (NoSuchMethodException e) {
+            throw new UdfRuntimeException("NoSuchMethodException Load class 
failed: ", e);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to