This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit a071827534cca966b87d1ea412ceb1c3f11058bb Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Sat Jan 21 08:22:22 2023 +0800 [vectorzied](jdbc) fix jdbc executor for get result by batch and memo… (#15843) result set should be get by batch size2. fix memory leak3. --- be/src/vec/exec/vjdbc_connector.cpp | 6 +- be/src/vec/exec/vjdbc_connector.h | 1 + fe/java-udf/pom.xml | 5 -- .../main/java/org/apache/doris/udf/FakeDriver.java | 70 ++++++++++++++++ .../java/org/apache/doris/udf/JdbcExecutor.java | 96 ++++++++++++---------- 5 files changed, 129 insertions(+), 49 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 519ccbbc77..0eb6d6f2a3 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -370,8 +370,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns const std::string& column_name = slot_desc->col_name(); jobject column_data = env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); - jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id); - + jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, + _executor_block_rows_id); for (int row = 0; row < num_rows; ++row) { jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, @@ -413,6 +413,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_close_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, _executor_has_next_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, _executor_get_blocks_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong", diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index c1d416783c..4f05253d64 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -114,6 +114,7 @@ private: jmethodID _executor_write_id; jmethodID _executor_read_id; jmethodID _executor_has_next_id; + jmethodID _executor_block_rows_id; jmethodID _executor_get_blocks_id; jmethodID _executor_get_types_id; jmethodID _executor_get_arr_list_id; diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 493015d49c..b37e9c4696 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -104,11 +104,6 @@ under the License. <version>${junit.version}</version> <scope>test</scope> </dependency> - <dependency> - <groupId>com.zaxxer</groupId> - <artifactId>HikariCP</artifactId> - <version>${hikaricp.version}</version> - </dependency> </dependencies> <build> <finalName>java-udf</finalName> diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java new file mode 100644 index 0000000000..94fbde6217 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + + +public class FakeDriver implements Driver { + private Driver driver; + + FakeDriver(Driver driver) { + this.driver = driver; + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return this.driver.connect(url, info); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return this.driver.acceptsURL(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return this.driver.getPropertyInfo(url, info); + } + + @Override + public int getMajorVersion() { + return this.driver.getMajorVersion(); + } + + @Override + public int getMinorVersion() { + return this.driver.getMinorVersion(); + } + + @Override + public boolean jdbcCompliant() { + return this.driver.jdbcCompliant(); + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index d90aa4055a..e76e9e78e3 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -21,17 +21,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; import com.google.common.base.Preconditions; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import java.io.FileNotFoundException; +import java.io.File; +import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.sql.Connection; import java.sql.Date; +import java.sql.Driver; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -50,10 +53,12 @@ public class JdbcExecutor { private Statement stmt = null; private ResultSet resultSet = null; private ResultSetMetaData resultSetMetaData = null; - // Use HikariDataSource to help us manage the JDBC connections. - private HikariDataSource dataSource = null; private List<String> resultColumnTypeNames = null; private int baseTypeInt = 0; + private URLClassLoader classLoader = null; + private List<List<Object>> block = null; + private int bacthSizeNum = 0; + private int curBlockRows = 0; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -75,22 +80,31 @@ public class JdbcExecutor { stmt.close(); } if (conn != null) { + conn.clearWarnings(); conn.close(); } - if (dataSource != null) { - dataSource.close(); + if (classLoader != null) { + classLoader.clearAssertionStatus(); + classLoader.close(); } resultSet = null; stmt = null; conn = null; - dataSource = null; + classLoader = null; } public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); resultSetMetaData = resultSet.getMetaData(); - return resultSetMetaData.getColumnCount(); + int columnCount = resultSetMetaData.getColumnCount(); + resultColumnTypeNames = new ArrayList<>(columnCount); + block = new ArrayList<>(columnCount); + for (int i = 0; i < columnCount; ++i) { + resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); + block.add(Arrays.asList(new Object[bacthSizeNum])); + } + return columnCount; } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); } @@ -111,17 +125,8 @@ public class JdbcExecutor { } } - public List<String> getResultColumnTypeNames() throws UdfRuntimeException { - try { - int count = resultSetMetaData.getColumnCount(); - resultColumnTypeNames = new ArrayList<>(count); - for (int i = 0; i < count; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - } - return resultColumnTypeNames; - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor getResultColumnTypeNames has error: ", e); - } + public List<String> getResultColumnTypeNames() { + return resultColumnTypeNames; } public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException { @@ -169,20 +174,15 @@ public class JdbcExecutor { } public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException { - List<List<Object>> block = null; try { int columnCount = resultSetMetaData.getColumnCount(); - block = new ArrayList<>(columnCount); - for (int i = 0; i < columnCount; ++i) { - block.add(new ArrayList<>(batchSize)); - } - int numRows = 0; + curBlockRows = 0; do { for (int i = 0; i < columnCount; ++i) { - block.get(i).add(resultSet.getObject(i + 1)); + block.get(i).set(curBlockRows, resultSet.getObject(i + 1)); } - numRows++; - } while (numRows < batchSize && resultSet.next()); + curBlockRows++; + } while (curBlockRows < batchSize && resultSet.next()); } catch (SQLException e) { throw new UdfRuntimeException("get next block failed: ", e); } catch (Exception e) { @@ -191,6 +191,10 @@ public class JdbcExecutor { return block; } + public int getCurBlockRows() { + return curBlockRows; + } + public boolean hasNext() throws UdfRuntimeException { try { if (resultSet == null) { @@ -242,33 +246,41 @@ public class JdbcExecutor { private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException { try { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); - Thread.currentThread().setContextClassLoader(classLoader); - HikariConfig config = new HikariConfig(); - config.setDriverClassName(driverClass); - config.setJdbcUrl(jdbcUrl); - config.setUsername(jdbcUser); - config.setPassword(jdbcPassword); - config.setMaximumPoolSize(1); + File file = new File(driverUrl); + URL url = file.toURI().toURL(); + classLoader = new URLClassLoader(new URL[] {url}); + Driver driver = (Driver) Class.forName(driverClass, true, classLoader).getDeclaredConstructor() + .newInstance(); + // in jdk11 cann't call addURL function by reflect to load class. so use this way + // But DriverManager can't find the driverClass correctly, so add a faker driver + // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html + DriverManager.registerDriver(new FakeDriver(driver)); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); - dataSource = new HikariDataSource(config); - conn = dataSource.getConnection(); if (op == TJdbcOperation.READ) { conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.FETCH_FORWARD); stmt.setFetchSize(batchSize); + bacthSizeNum = batchSize; } else { stmt = conn.createStatement(); } - } catch (FileNotFoundException e) { - throw new UdfRuntimeException("Can not find driver file: " + driverUrl, e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("ClassNotFoundException: " + driverClass, e); } catch (MalformedURLException e) { throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); } catch (SQLException e) { throw new UdfRuntimeException("Initialize datasource failed: ", e); + } catch (InstantiationException e) { + throw new UdfRuntimeException("InstantiationException failed: ", e); + } catch (IllegalAccessException e) { + throw new UdfRuntimeException("IllegalAccessException failed: ", e); + } catch (InvocationTargetException e) { + throw new UdfRuntimeException("InvocationTargetException new instance failed: ", e); + } catch (NoSuchMethodException e) { + throw new UdfRuntimeException("NoSuchMethodException Load class failed: ", e); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org