This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 253445ca46 [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843) 253445ca46 is described below commit 253445ca466cc105bf6ebfce5524cfeae894767c Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Sat Jan 21 08:22:22 2023 +0800 [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843) result set should be get by batch size2. fix memory leak3. --- be/src/vec/exec/vjdbc_connector.cpp | 6 +- be/src/vec/exec/vjdbc_connector.h | 1 + docs/zh-CN/docs/lakehouse/external-table/jdbc.md | 6 +- docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md | 9 ++ fe/java-udf/pom.xml | 5 -- .../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++ .../java/org/apache/doris/udf/JdbcExecutor.java | 96 ++++++++++++---------- 7 files changed, 141 insertions(+), 52 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index adc1ac4cd2..d9e6838925 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns const std::string& column_name = slot_desc->col_name(); jobject column_data = env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); - jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id); - + jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, + _executor_block_rows_id); for (int row = 0; row < num_rows; ++row) { jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, @@ -412,6 +412,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_close_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, _executor_has_next_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, _executor_get_blocks_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong", diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index c1d416783c..4f05253d64 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -114,6 +114,7 @@ private: jmethodID _executor_write_id; jmethodID _executor_read_id; jmethodID _executor_has_next_id; + jmethodID _executor_block_rows_id; jmethodID _executor_get_blocks_id; jmethodID _executor_get_types_id; jmethodID _executor_get_arr_list_id; diff --git a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md index f87cfe3bf2..4c53d27243 100644 --- a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md @@ -55,7 +55,7 @@ properties ( "type"="jdbc", "user"="root", "password"="123456", - "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test", + "jdbc_url"="jdbc:mysql://192.168.0.1:3306/test?useCursorFetch=true", "driver_url"="http://IP:port/mysql-connector-java-5.1.47.jar", "driver_class"="com.mysql.jdbc.Driver" ); @@ -81,7 +81,7 @@ PROPERTIES ( | **type** | "jdbc", 必填项标志资源类型 | | **user** | 访问外表数据库所使的用户名 | | **password** | 该用户对应的密码信息 | -| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test"。| +| **jdbc_url** | JDBC的URL协议,包括数据库类型,IP地址,端口号和数据库名,不同数据库协议格式不一样。例如mysql: "jdbc:mysql://127.0.0.1:3306/test?useCursorFetch=true"。| | **driver_class** | 访问外表数据库的驱动包类名,例如mysql是:com.mysql.jdbc.Driver. | | **driver_url** | 用于下载访问外部数据库的jar包驱动URL。http://IP:port/mysql-connector-java-5.1.47.jar。本地单机测试时,可将jar包放在本地路径下,"driver_url"="file:///home/disk1/pathTo/mysql-connector-java-5.1.47.jar",多机时需保证具有完全相同的路径信息。 | | **resource** | 在Doris中建立外表时依赖的资源名,对应上步创建资源时的名字。| @@ -266,4 +266,4 @@ PROPERTIES ( ## Q&A -请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。 \ No newline at end of file +请参考 [JDBC Catalog](../multi-catalog/jdbc) 中的 常见问题一节。 diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index 136dfd41db..7d16eeace3 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -332,3 +332,12 @@ JDBC Catalog 通过标准 JDBC 协议,连接其他数据源。 ``` 可在创建 Catalog 的 `jdbc_url` 把JDBC连接串最后增加 `?useSSL=false` ,如 `"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false"` + +6. 查询MYSQL的数据库报OutOfMemoryError的错误 + + 为减少内存的使用,在获取结果集时,每次仅获取batchSize的大小,这样一批一批的获取结果。而MYSQL默认是一次将结果全部加载到内存, + 设置的按批获取无法生效,需要主动显示的在URL中指定:"jdbc_url"="jdbc:mysql://IP:PORT/doris_test?useCursorFetch=true" + + 7. 在使用JDBC查询过程中时,如果出现"CAUSED BY: SQLException OutOfMemoryError" 类似的错误 + + 如果MYSQL已经主动设置useCursorFetch,可以在be.conf中修改jvm_max_heap_size的值,尝试增大JVM的内存,目前默认值为1024M。 \ No newline at end of file diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 493015d49c..b37e9c4696 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -104,11 +104,6 @@ under the License. <version>${junit.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>com.zaxxer</groupId> - <artifactId>HikariCP</artifactId> - <version>${hikaricp.version}</version> - </dependency> </dependencies> <build> <finalName>java-udf</finalName> diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java new file mode 100644 index 0000000000..94fbde6217 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + + +public class FakeDriver implements Driver { + private Driver driver; + + FakeDriver(Driver driver) { + this.driver = driver; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return this.driver.connect(url, info); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return this.driver.acceptsURL(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return this.driver.getPropertyInfo(url, info); + } + + @Override + public int getMajorVersion() { + return this.driver.getMajorVersion(); + } + + @Override + public int getMinorVersion() { + return this.driver.getMinorVersion(); + } + + @Override + public boolean jdbcCompliant() { + return this.driver.jdbcCompliant(); + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index d90aa4055a..e76e9e78e3 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; import com.google.common.base.Preconditions; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import java.io.FileNotFoundException; +import java.io.File; +import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.sql.Connection; import java.sql.Date; +import java.sql.Driver; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -50,10 +53,12 @@ public class JdbcExecutor { private Statement stmt = null; private ResultSet resultSet = null; private ResultSetMetaData resultSetMetaData = null; - // Use HikariDataSource to help us manage the JDBC connections. - private HikariDataSource dataSource = null; private List<String> resultColumnTypeNames = null; private int baseTypeInt = 0; + private URLClassLoader classLoader = null; + private List<List<Object>> block = null; + private int bacthSizeNum = 0; + private int curBlockRows = 0; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -75,22 +80,31 @@ public class JdbcExecutor { stmt.close(); } if (conn != null) { + conn.clearWarnings(); conn.close(); } - if (dataSource != null) { - dataSource.close(); + if (classLoader != null) { + classLoader.clearAssertionStatus(); + classLoader.close(); } resultSet = null; stmt = null; conn = null; - dataSource = null; + classLoader = null; } public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); resultSetMetaData = resultSet.getMetaData(); - return resultSetMetaData.getColumnCount(); + int columnCount = resultSetMetaData.getColumnCount(); + resultColumnTypeNames = new ArrayList<>(columnCount); + block = new ArrayList<>(columnCount); + for (int i = 0; i < columnCount; ++i) { + resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); + block.add(Arrays.asList(new Object[bacthSizeNum])); + } + return columnCount; } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); } @@ -111,17 +125,8 @@ public class JdbcExecutor { } } - public List<String> getResultColumnTypeNames() throws UdfRuntimeException { - try { - int count = resultSetMetaData.getColumnCount(); - resultColumnTypeNames = new ArrayList<>(count); - for (int i = 0; i < count; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - } - return resultColumnTypeNames; - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor getResultColumnTypeNames has error: ", e); - } + public List<String> getResultColumnTypeNames() { + return resultColumnTypeNames; } public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException { @@ -169,20 +174,15 @@ public class JdbcExecutor { } public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException { - List<List<Object>> block = null; try { int columnCount = resultSetMetaData.getColumnCount(); - block = new ArrayList<>(columnCount); - for (int i = 0; i < columnCount; ++i) { - block.add(new ArrayList<>(batchSize)); - } - int numRows = 0; + curBlockRows = 0; do { for (int i = 0; i < columnCount; ++i) { - block.get(i).add(resultSet.getObject(i + 1)); + block.get(i).set(curBlockRows, resultSet.getObject(i + 1)); } - numRows++; - } while (numRows < batchSize && resultSet.next()); + curBlockRows++; + } while (curBlockRows < batchSize && resultSet.next()); } catch (SQLException e) { throw new UdfRuntimeException("get next block failed: ", e); } catch (Exception e) { @@ -191,6 +191,10 @@ public class JdbcExecutor { return block; } + public int getCurBlockRows() { + return curBlockRows; + } + public boolean hasNext() throws UdfRuntimeException { try { if (resultSet == null) { @@ -242,33 +246,41 @@ public class JdbcExecutor { private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException { try { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); - Thread.currentThread().setContextClassLoader(classLoader); - HikariConfig config = new HikariConfig(); - config.setDriverClassName(driverClass); - config.setJdbcUrl(jdbcUrl); - config.setUsername(jdbcUser); - config.setPassword(jdbcPassword); - config.setMaximumPoolSize(1); + File file = new File(driverUrl); + URL url = file.toURI().toURL(); + classLoader = new URLClassLoader(new URL[] {url}); + Driver driver = (Driver) Class.forName(driverClass, true, classLoader).getDeclaredConstructor() + .newInstance(); + // in jdk11 cann't call addURL function by reflect to load class. so use this way + // But DriverManager can't find the driverClass correctly, so add a faker driver + // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html + DriverManager.registerDriver(new FakeDriver(driver)); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); - dataSource = new HikariDataSource(config); - conn = dataSource.getConnection(); if (op == TJdbcOperation.READ) { conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.FETCH_FORWARD); stmt.setFetchSize(batchSize); + bacthSizeNum = batchSize; } else { stmt = conn.createStatement(); } - } catch (FileNotFoundException e) { - throw new UdfRuntimeException("Can not find driver file: " + driverUrl, e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("ClassNotFoundException: " + driverClass, e); } catch (MalformedURLException e) { throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); } catch (SQLException e) { throw new UdfRuntimeException("Initialize datasource failed: ", e); + } catch (InstantiationException e) { + throw new UdfRuntimeException("InstantiationException failed: ", e); + } catch (IllegalAccessException e) { + throw new UdfRuntimeException("IllegalAccessException failed: ", e); + } catch (InvocationTargetException e) { + throw new UdfRuntimeException("InvocationTargetException new instance failed: ", e); + } catch (NoSuchMethodException e) { + throw new UdfRuntimeException("NoSuchMethodException Load class failed: ", e); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org