github-actions[bot] commented on code in PR #61084:
URL: https://github.com/apache/doris/pull/61084#discussion_r2893868042
##########
be/src/vec/sink/writer/vjdbc_table_writer.cpp:
##########
@@ -22,61 +22,83 @@
#include <sstream>
-#include "util/binary_cast.hpp"
+#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
namespace doris {
namespace vectorized {
-JdbcConnectorParam VJdbcTableWriter::create_connect_param(const
doris::TDataSink& t_sink) {
+std::map<std::string, std::string>
VJdbcTableWriter::_build_writer_params(const TDataSink& t_sink) {
const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
+ std::map<std::string, std::string> params;
- JdbcConnectorParam jdbc_param;
-
- jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
- jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
- jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
- jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
- jdbc_param.driver_class = t_jdbc_sink.jdbc_table.jdbc_driver_class;
- jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url;
- jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
- jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
- jdbc_param.table_type = t_jdbc_sink.table_type;
- jdbc_param.query_string = t_jdbc_sink.insert_sql;
- jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
- jdbc_param.use_transaction = t_jdbc_sink.use_transaction;
- jdbc_param.connection_pool_min_size =
t_jdbc_sink.jdbc_table.connection_pool_min_size;
- jdbc_param.connection_pool_max_size =
t_jdbc_sink.jdbc_table.connection_pool_max_size;
- jdbc_param.connection_pool_max_wait_time =
t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
- jdbc_param.connection_pool_max_life_time =
t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
- jdbc_param.connection_pool_keep_alive =
t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
-
- return jdbc_param;
+ params["jdbc_url"] = t_jdbc_sink.jdbc_table.jdbc_url;
+ params["jdbc_user"] = t_jdbc_sink.jdbc_table.jdbc_user;
+ params["jdbc_password"] = t_jdbc_sink.jdbc_table.jdbc_password;
+ params["jdbc_driver_class"] = t_jdbc_sink.jdbc_table.jdbc_driver_class;
+ params["jdbc_driver_url"] = t_jdbc_sink.jdbc_table.jdbc_driver_url;
+ params["jdbc_driver_checksum"] =
t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
+ params["insert_sql"] = t_jdbc_sink.insert_sql;
+ params["use_transaction"] = t_jdbc_sink.use_transaction ? "true" : "false";
+ params["catalog_id"] = std::to_string(t_jdbc_sink.jdbc_table.catalog_id);
+ params["connection_pool_min_size"] =
+ std::to_string(t_jdbc_sink.jdbc_table.connection_pool_min_size);
+ params["connection_pool_max_size"] =
+ std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_size);
+ params["connection_pool_max_wait_time"] =
+
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_wait_time);
+ params["connection_pool_max_life_time"] =
+
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_life_time);
+ params["connection_pool_keep_alive"] =
+ t_jdbc_sink.jdbc_table.connection_pool_keep_alive ? "true" :
"false";
+
+ return params;
}
VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs,
std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency>
fin_dep)
: AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
- JdbcConnector(create_connect_param(t_sink)) {}
+ _writer_params(_build_writer_params(t_sink)),
+ _use_transaction(t_sink.jdbc_table_sink.use_transaction) {}
+
+Status VJdbcTableWriter::open(RuntimeState* state, RuntimeProfile*
operator_profile) {
+ _writer = std::make_unique<VJniFormatTransformer>(
+ state, _vec_output_expr_ctxs,
"org/apache/doris/jdbc/JdbcJniWriter", _writer_params);
+ return _writer->open();
+}
Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
- auto num_rows = output_block.rows();
-
- uint32_t start_send_row = 0;
- uint32_t num_row_sent = 0;
- while (start_send_row < num_rows) {
- RETURN_IF_ERROR(append(&output_block, _vec_output_expr_ctxs,
start_send_row, &num_row_sent,
- _conn_param.table_type));
- start_send_row += num_row_sent;
- num_row_sent = 0;
+
+ if (output_block.rows() == 0) {
+ return Status::OK();
+ }
+
+ return _writer->write(output_block);
+}
+
+Status VJdbcTableWriter::finish(RuntimeState* state) {
+ if (!_use_transaction || !_writer) {
+ return Status::OK();
}
+ // Call commitTrans on the Java JdbcJniWriter via JNI
+ // VJniFormatTransformer manages the JNI writer object, so we use
get_statistics
+ // to check for errors. The actual commit is done as part of close() on
the Java side
+ // when use_transaction is true.
+ // TODO: Add explicit commitTrans JNI call support to VJniFormatTransformer
Review Comment:
**Bug: Transaction commit is a no-op — data silently lost** — When
`use_transaction=true`, `JdbcJniWriter.open()` calls `setAutoCommit(false)`,
but this `finish()` method returns `Status::OK()` without ever calling
`commitTrans()` on the Java side. The subsequent `close()` in
`JdbcJniWriter.java` only closes the PreparedStatement and Connection — it does
NOT commit the transaction. This means **all data written with transactions
enabled will be silently rolled back**.
This needs an explicit JNI call to `commitTrans()` before `close()`, or the
Java-side `close()` must be updated to commit when `useTransaction` is true.
The TODO comment acknowledges this but the current code will cause silent
data loss in production.
##########
be/src/service/internal_service.cpp:
##########
@@ -979,6 +974,33 @@ void
PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro
}
}
+// Resolve JDBC driver URL to a file:// path.
+// Handles relative paths by looking in configured/default driver directories.
+static Status _resolve_jdbc_driver_url(const std::string& url, std::string*
result_url) {
+ if (url.find(":/") != std::string::npos) {
+ *result_url = url;
+ return Status::OK();
+ }
+ const char* doris_home = std::getenv("DORIS_HOME");
Review Comment:
**Bug: Null pointer dereference** — `std::getenv("DORIS_HOME")` returns
`nullptr` if the environment variable is not set. The next line passes this
directly to `std::string(doris_home)` which invokes undefined behavior (and in
practice crashes) when `doris_home` is null.
```cpp
const char* doris_home = std::getenv("DORIS_HOME");
if (doris_home == nullptr) {
return Status::InternalError("DORIS_HOME environment variable is not
set");
}
```
##########
be/src/service/internal_service.cpp:
##########
@@ -867,11 +866,7 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
file_slots,
io_ctx.get(), io_ctx);
break;
}
- case TFileFormatType::FORMAT_AVRO: {
- reader = vectorized::AvroJNIReader::create_unique(profile.get(),
params, range,
- file_slots);
- break;
- }
+
default:
Review Comment:
**Regression: Avro schema discovery broken** — The `FORMAT_AVRO` case was
removed here (along with the deletion of `avro_jni_reader.cpp/h`), but no
replacement was added. `fetch_table_schema` for Avro files will now hit this
`default` case and return `"Not supported file format"`, breaking Avro schema
discovery.
If Avro support is being moved to a different code path, that should be
included in this PR. Otherwise, this case needs to be preserved or migrated to
the new JniReader framework.
##########
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcJniWriter.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.VectorColumn;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * JdbcJniWriter writes C++ Block data to JDBC targets via PreparedStatement
batch inserts.
+ * It extends JniWriter to integrate with the unified VJniFormatTransformer
framework,
+ * following the same pattern as MaxComputeJniWriter.
+ *
+ * <p>Lifecycle (managed by C++ VJniFormatTransformer):
+ * <pre>
+ * open() -> write() [repeated] -> close()
+ * </pre>
+ *
+ * <p>Transaction control is exposed via getStatistics() responses and
+ * additional JNI method calls from C++ side.
+ *
+ * <p>Parameters (passed via constructor params map):
+ * <ul>
+ * <li>jdbc_url - JDBC connection URL</li>
+ * <li>jdbc_user - database user</li>
+ * <li>jdbc_password - database password</li>
+ * <li>jdbc_driver_class - JDBC driver class name</li>
+ * <li>jdbc_driver_url - path to driver JAR</li>
+ * <li>jdbc_driver_checksum - MD5 checksum of driver JAR</li>
+ * <li>insert_sql - INSERT SQL with ? placeholders</li>
+ * <li>use_transaction - "true"/"false"</li>
+ * <li>catalog_id - catalog ID for connection pool keying</li>
+ * <li>connection_pool_min_size - min pool size</li>
+ * <li>connection_pool_max_size - max pool size</li>
+ * <li>connection_pool_max_wait_time - max wait time (ms)</li>
+ * <li>connection_pool_max_life_time - max lifetime (ms)</li>
+ * <li>connection_pool_keep_alive - "true"/"false"</li>
+ * </ul>
+ */
+public class JdbcJniWriter extends JniWriter {
+ private static final Logger LOG = Logger.getLogger(JdbcJniWriter.class);
+
+ private final String jdbcUrl;
+ private final String jdbcUser;
+ private final String jdbcPassword;
+ private final String jdbcDriverClass;
+ private final String jdbcDriverUrl;
+ private final String jdbcDriverChecksum;
+ private final String insertSql;
+ private final boolean useTransaction;
+ private final long catalogId;
+ private final int connectionPoolMinSize;
+ private final int connectionPoolMaxSize;
+ private final int connectionPoolMaxWaitTime;
+ private final int connectionPoolMaxLifeTime;
+ private final boolean connectionPoolKeepAlive;
+
+ private HikariDataSource hikariDataSource = null;
+ private Connection conn = null;
+ private PreparedStatement preparedStatement = null;
+ private ClassLoader classLoader = null;
+
+ // Statistics
+ private long writtenRows = 0;
+ private long insertTime = 0;
+
+ public JdbcJniWriter(int batchSize, Map<String, String> params) {
+ super(batchSize, params);
+ this.jdbcUrl = params.getOrDefault("jdbc_url", "");
+ this.jdbcUser = params.getOrDefault("jdbc_user", "");
+ this.jdbcPassword = params.getOrDefault("jdbc_password", "");
+ this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
+ this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
+ this.jdbcDriverChecksum = params.getOrDefault("jdbc_driver_checksum",
"");
+ this.insertSql = params.getOrDefault("insert_sql", "");
+ this.useTransaction =
"true".equalsIgnoreCase(params.getOrDefault("use_transaction", "false"));
+ this.catalogId = Long.parseLong(params.getOrDefault("catalog_id",
"0"));
+ this.connectionPoolMinSize =
Integer.parseInt(params.getOrDefault("connection_pool_min_size", "1"));
+ this.connectionPoolMaxSize =
Integer.parseInt(params.getOrDefault("connection_pool_max_size", "10"));
+ this.connectionPoolMaxWaitTime = Integer.parseInt(
+ params.getOrDefault("connection_pool_max_wait_time", "5000"));
+ this.connectionPoolMaxLifeTime = Integer.parseInt(
+ params.getOrDefault("connection_pool_max_life_time",
"1800000"));
+ this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
+ params.getOrDefault("connection_pool_keep_alive", "false"));
+ }
+
+ @Override
+ public void open() throws IOException {
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ initializeClassLoaderAndDataSource();
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ conn = hikariDataSource.getConnection();
+
+ if (useTransaction) {
+ conn.setAutoCommit(false);
+ }
+
+ LOG.info("JdbcJniWriter: Preparing insert statement: " +
insertSql);
+ preparedStatement = conn.prepareStatement(insertSql);
+ } catch (Exception e) {
+ throw new IOException("JdbcJniWriter open failed: " +
e.getMessage(), e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldClassLoader);
+ }
+ }
+
+ @Override
+ protected void writeInternal(VectorTable inputTable) throws IOException {
+ try {
+ long startInsert = System.nanoTime();
+ int numRows = inputTable.getNumRows();
+ VectorColumn[] columns = inputTable.getColumns();
+
+ for (int i = 0; i < numRows; ++i) {
+ for (int j = 0; j < columns.length; ++j) {
+ insertColumn(i, j, columns[j]);
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+
+ insertTime += System.nanoTime() - startInsert;
+ writtenRows += numRows;
+ } catch (SQLException e) {
+ throw new IOException("JdbcJniWriter write failed: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ // Commit transaction if enabled (C++ side calls commitTrans via
JNI before close)
+ if (preparedStatement != null && !preparedStatement.isClosed()) {
+ preparedStatement.close();
+ }
+ if (conn != null && !conn.isClosed()) {
+ conn.close();
+ }
+ } catch (SQLException e) {
+ throw new IOException("JdbcJniWriter close failed: " +
e.getMessage(), e);
+ } finally {
+ preparedStatement = null;
+ conn = null;
+ if (connectionPoolMinSize == 0 && hikariDataSource != null) {
+ hikariDataSource.close();
+ JdbcDataSource.getDataSource().getSourcesMap()
+ .remove(createCacheKey());
+ hikariDataSource = null;
+ }
+ }
+ }
+
+ // === Transaction control methods (called by C++ via JNI) ===
+
+ public void openTrans() throws IOException {
+ try {
+ if (conn != null) {
+ conn.setAutoCommit(false);
+ }
+ } catch (SQLException e) {
+ throw new IOException("JdbcJniWriter openTrans failed: " +
e.getMessage(), e);
+ }
+ }
+
+ public void commitTrans() throws IOException {
+ try {
+ if (conn != null) {
+ conn.commit();
+ }
+ } catch (SQLException e) {
+ throw new IOException("JdbcJniWriter commitTrans failed: " +
e.getMessage(), e);
+ }
+ }
+
+ public void rollbackTrans() throws IOException {
+ try {
+ if (conn != null) {
+ conn.rollback();
+ }
+ } catch (SQLException e) {
+ throw new IOException("JdbcJniWriter rollbackTrans failed: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getStatistics() {
+ Map<String, String> stats = new HashMap<>();
+ stats.put("counter:WrittenRows", String.valueOf(writtenRows));
+ stats.put("timer:InsertTime", String.valueOf(insertTime));
+ stats.put("timer:WriteTime", String.valueOf(writeTime));
+ stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+ return stats;
+ }
+
+ // =====================================================================
+ // Private helpers — adapted from BaseJdbcExecutor.insert/insertColumn
+ // =====================================================================
+
+ private void insertColumn(int rowIdx, int colIdx, VectorColumn column)
throws SQLException {
+ int parameterIndex = colIdx + 1;
+ ColumnType.Type dorisType = column.getColumnPrimitiveType();
+ if (column.isNullAt(rowIdx)) {
+ insertNullColumn(parameterIndex, dorisType);
+ return;
+ }
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setBoolean(parameterIndex,
column.getBoolean(rowIdx));
+ break;
+ case TINYINT:
+ preparedStatement.setByte(parameterIndex,
column.getByte(rowIdx));
+ break;
+ case SMALLINT:
+ preparedStatement.setShort(parameterIndex,
column.getShort(rowIdx));
+ break;
+ case INT:
+ preparedStatement.setInt(parameterIndex,
column.getInt(rowIdx));
+ break;
+ case BIGINT:
+ preparedStatement.setLong(parameterIndex,
column.getLong(rowIdx));
+ break;
+ case LARGEINT:
+ preparedStatement.setObject(parameterIndex,
column.getBigInteger(rowIdx));
+ break;
+ case FLOAT:
+ preparedStatement.setFloat(parameterIndex,
column.getFloat(rowIdx));
+ break;
+ case DOUBLE:
+ preparedStatement.setDouble(parameterIndex,
column.getDouble(rowIdx));
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setBigDecimal(parameterIndex,
column.getDecimal(rowIdx));
+ break;
+ case DATEV2:
+ preparedStatement.setDate(parameterIndex,
Date.valueOf(column.getDate(rowIdx)));
+ break;
+ case DATETIMEV2:
+ preparedStatement.setTimestamp(
+ parameterIndex,
Timestamp.valueOf(column.getDateTime(rowIdx)));
+ break;
+ case TIMESTAMPTZ:
+ preparedStatement.setObject(
+ parameterIndex,
Timestamp.valueOf(column.getTimeStampTz(rowIdx)));
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ preparedStatement.setString(parameterIndex,
column.getStringWithOffset(rowIdx));
+ break;
+ case BINARY:
+ case VARBINARY:
+ preparedStatement.setBytes(parameterIndex,
column.getBytesVarbinary(rowIdx));
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+
+ private void insertNullColumn(int parameterIndex, ColumnType.Type
dorisType)
+ throws SQLException {
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+ break;
+ case TINYINT:
+ preparedStatement.setNull(parameterIndex, Types.TINYINT);
+ break;
+ case SMALLINT:
+ preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+ break;
+ case INT:
+ preparedStatement.setNull(parameterIndex, Types.INTEGER);
+ break;
+ case BIGINT:
+ preparedStatement.setNull(parameterIndex, Types.BIGINT);
+ break;
+ case LARGEINT:
+ preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
+ break;
+ case FLOAT:
+ preparedStatement.setNull(parameterIndex, Types.FLOAT);
+ break;
+ case DOUBLE:
+ preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+ break;
+ case DATEV2:
+ preparedStatement.setNull(parameterIndex, Types.DATE);
+ break;
+ case DATETIMEV2:
+ preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+ break;
+ case TIMESTAMPTZ:
+ preparedStatement.setNull(parameterIndex,
Types.TIMESTAMP_WITH_TIMEZONE);
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+ break;
+ case BINARY:
+ case VARBINARY:
+ preparedStatement.setNull(parameterIndex, Types.VARBINARY);
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+
+ private void initializeClassLoaderAndDataSource() throws Exception {
+ java.net.URL[] urls = {new java.net.URL(jdbcDriverUrl)};
+ ClassLoader parent = getClass().getClassLoader();
+ this.classLoader = java.net.URLClassLoader.newInstance(urls, parent);
+
+ String cacheKey = createCacheKey();
+ hikariDataSource = JdbcDataSource.getDataSource().getSource(cacheKey);
+ if (hikariDataSource == null) {
+ synchronized (JdbcJniWriter.class) {
+ hikariDataSource =
JdbcDataSource.getDataSource().getSource(cacheKey);
+ if (hikariDataSource == null) {
+ HikariDataSource ds = new HikariDataSource();
+ ds.setDriverClassName(jdbcDriverClass);
+
ds.setJdbcUrl(SecurityChecker.getInstance().getSafeJdbcUrl(jdbcUrl));
+ ds.setUsername(jdbcUser);
+ ds.setPassword(jdbcPassword);
+ ds.setMinimumIdle(connectionPoolMinSize);
+ ds.setMaximumPoolSize(connectionPoolMaxSize);
+ ds.setConnectionTimeout(connectionPoolMaxWaitTime);
+ ds.setMaxLifetime(connectionPoolMaxLifeTime);
+ ds.setIdleTimeout(connectionPoolMaxLifeTime / 2L);
+ ds.setConnectionTestQuery("SELECT 1");
Review Comment:
**Bug: Hardcoded validation query fails on some databases** — `"SELECT 1"`
is not valid on all databases (e.g., Oracle requires `SELECT 1 FROM DUAL`, DB2
uses `SELECT 1 FROM SYSIBM.SYSDUMMY1`).
The sibling class `JdbcJniScanner` correctly handles this via
`typeHandler.setValidationQuery(ds)`, but this class hardcodes the query. This
should use the same `JdbcTypeHandler.setValidationQuery()` mechanism:
```java
JdbcTypeHandler typeHandler =
JdbcTypeHandlerFactory.createHandler(tableType);
typeHandler.setValidationQuery(ds);
```
##########
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcJniScanner.java:
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JdbcJniScanner reads data from JDBC sources via the unified JniScanner
framework.
+ * It extends JniScanner to integrate with the JniConnector/JniReader system
on the C++ side,
+ * following the same pattern as PaimonJniScanner, HudiJniScanner, etc.
+ *
+ * <p>This class uses the {@link JdbcTypeHandler} strategy pattern for
database-specific
+ * type handling. The appropriate handler is selected via {@link
JdbcTypeHandlerFactory}
+ * based on the "table_type" parameter.
+ *
+ * <p>Parameters (passed via constructor params map):
+ * <ul>
+ * <li>jdbc_url - JDBC connection URL</li>
+ * <li>jdbc_user - database user</li>
+ * <li>jdbc_password - database password</li>
+ * <li>jdbc_driver_class - JDBC driver class name</li>
+ * <li>jdbc_driver_url - path to driver JAR</li>
+ * <li>query_sql - the SELECT SQL to execute</li>
+ * <li>catalog_id - catalog ID for connection pool keying</li>
+ * <li>table_type - database type (MYSQL, ORACLE, POSTGRESQL, etc.)</li>
+ * <li>connection_pool_min_size - min connection pool size</li>
+ * <li>connection_pool_max_size - max connection pool size</li>
+ * <li>connection_pool_max_wait_time - max wait time (ms)</li>
+ * <li>connection_pool_max_life_time - max lifetime (ms)</li>
+ * <li>connection_pool_keep_alive - "true"/"false"</li>
+ * <li>required_fields - comma-separated output field names</li>
+ * <li>columns_types - #-separated column type strings</li>
+ * </ul>
+ */
+public class JdbcJniScanner extends JniScanner {
+ private static final Logger LOG = Logger.getLogger(JdbcJniScanner.class);
+
+ private final String jdbcUrl;
+ private final String jdbcUser;
+ private final String jdbcPassword;
+ private final String jdbcDriverClass;
+ private final String jdbcDriverUrl;
+ private final String querySql;
+ private final long catalogId;
+ private final int connectionPoolMinSize;
+ private final int connectionPoolMaxSize;
+ private final int connectionPoolMaxWaitTime;
+ private final int connectionPoolMaxLifeTime;
+ private final boolean connectionPoolKeepAlive;
+
+ // Database-specific type handling strategy
+ private final JdbcTypeHandler typeHandler;
+ // Per-column output converters, initialized once per scan
+ private ColumnValueConverter[] outputConverters;
+
+ private HikariDataSource hikariDataSource = null;
+ private Connection conn = null;
+ private PreparedStatement stmt = null;
+ private ResultSet resultSet = null;
+ private ResultSetMetaData resultSetMetaData = null;
+ private ClassLoader classLoader = null;
+
+ // Read state
+ private boolean resultSetOpened = false;
+ private List<Object[]> block = null;
+
+ // Statistics
+ private long readRows = 0;
+ private long readTime = 0;
+
+ public JdbcJniScanner(int batchSize, Map<String, String> params) {
+ this.jdbcUrl = params.getOrDefault("jdbc_url", "");
+ this.jdbcUser = params.getOrDefault("jdbc_user", "");
+ this.jdbcPassword = params.getOrDefault("jdbc_password", "");
+ this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
+ this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
+ this.querySql = params.getOrDefault("query_sql", "");
+ this.catalogId = Long.parseLong(params.getOrDefault("catalog_id",
"0"));
+ this.connectionPoolMinSize = Integer.parseInt(
+ params.getOrDefault("connection_pool_min_size", "1"));
+ this.connectionPoolMaxSize = Integer.parseInt(
+ params.getOrDefault("connection_pool_max_size", "10"));
+ this.connectionPoolMaxWaitTime = Integer.parseInt(
+ params.getOrDefault("connection_pool_max_wait_time", "5000"));
+ this.connectionPoolMaxLifeTime = Integer.parseInt(
+ params.getOrDefault("connection_pool_max_life_time",
"1800000"));
+ this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
+ params.getOrDefault("connection_pool_keep_alive", "false"));
+
+ // Select database-specific type handler
+ String tableType = params.getOrDefault("table_type", "");
+ this.typeHandler = JdbcTypeHandlerFactory.create(tableType);
+
+ String requiredFields = params.getOrDefault("required_fields", "");
+ String columnsTypes = params.getOrDefault("columns_types", "");
+ String[] fieldArr = requiredFields.isEmpty() ? new String[0] :
requiredFields.split(",");
+ ColumnType[] typeArr;
+ if (columnsTypes.isEmpty()) {
+ typeArr = new ColumnType[0];
+ } else {
+ String[] typeStrs = columnsTypes.split("#");
+ typeArr = new ColumnType[typeStrs.length];
+ for (int i = 0; i < typeStrs.length; i++) {
+ typeArr[i] = ColumnType.parseType(fieldArr[i], typeStrs[i]);
+ }
+ }
+ initTableInfo(typeArr, fieldArr, batchSize);
+ }
+
+ @Override
+ public void open() throws IOException {
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ // Set JVM-level driver properties
+ typeHandler.setSystemProperties();
+
+ initializeClassLoaderAndDataSource();
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ conn = hikariDataSource.getConnection();
+
+ // Use type handler to create the statement with database-specific
settings
+ stmt = typeHandler.initializeStatement(conn, querySql, batchSize);
+
+ LOG.info("JdbcJniScanner: Executing query: " + querySql);
Review Comment:
**Issue: Query SQL logged at INFO level** — This logs every executed query
at INFO level, which can:
1. Expose sensitive data in queries (WHERE clauses with PII, etc.)
2. Create excessive log volume in production with high query rates
This should be `LOG.debug()` or at minimum `LOG.info()` with the SQL
truncated/redacted.
##########
be/src/vec/exec/format/jni_reader.cpp:
##########
@@ -36,36 +43,336 @@ class Block;
namespace doris::vectorized {
-MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
- RuntimeState* state, RuntimeProfile* profile)
- : JniReader(file_slot_descs, state, profile) {
- std::ostringstream required_fields;
- std::ostringstream columns_types;
- std::vector<std::string> column_names;
- int index = 0;
- for (const auto& desc : _file_slot_descs) {
- std::string field = desc->col_name();
- std::string type =
JniConnector::get_jni_type_with_different_string(desc->type());
- column_names.emplace_back(field);
- if (index == 0) {
- required_fields << field;
- columns_types << type;
- } else {
- required_fields << "," << field;
- columns_types << "#" << type;
+// =========================================================================
+// JniReader constructors
+// =========================================================================
+
+JniReader::JniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state,
+ RuntimeProfile* profile, std::string connector_class,
+ std::map<std::string, std::string> scanner_params,
+ std::vector<std::string> column_names, int64_t
self_split_weight)
+ : _file_slot_descs(file_slot_descs),
+ _state(state),
+ _profile(profile),
+ _connector_class(std::move(connector_class)),
+ _scanner_params(std::move(scanner_params)),
+ _column_names(std::move(column_names)),
+ _self_split_weight(static_cast<int32_t>(self_split_weight)) {
+ _connector_name = split(_connector_class, "/").back();
+}
+
+JniReader::JniReader(std::string connector_class, std::map<std::string,
std::string> scanner_params)
+ : _file_slot_descs(*(new std::vector<SlotDescriptor*>())),
Review Comment:
**Bug: Memory leak** — This constructor heap-allocates a `std::vector` via
`new` and stores it as a `const` reference (`_file_slot_descs`), but the
allocated memory is **never freed**. Every call to this constructor (used by
Avro schema discovery and `JdbcConnectionTester` paths) leaks one `std::vector`
object.
Suggested fix: Use an empty static vector or an owned member:
```cpp
// Option 1: static empty vector (if _file_slot_descs is truly unused)
static const std::vector<SlotDescriptor*> kEmptySlots;
JniReader::JniReader(std::string connector_class, std::map<std::string,
std::string> scanner_params)
: _file_slot_descs(kEmptySlots), ...
// Option 2: Change _file_slot_descs from const& to value type
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]