github-actions[bot] commented on code in PR #61084:
URL: https://github.com/apache/doris/pull/61084#discussion_r2893868042


##########
be/src/vec/sink/writer/vjdbc_table_writer.cpp:
##########
@@ -22,61 +22,83 @@
 
 #include <sstream>
 
-#include "util/binary_cast.hpp"
+#include "runtime/runtime_state.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 namespace vectorized {
 
-JdbcConnectorParam VJdbcTableWriter::create_connect_param(const 
doris::TDataSink& t_sink) {
+std::map<std::string, std::string> 
VJdbcTableWriter::_build_writer_params(const TDataSink& t_sink) {
     const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
+    std::map<std::string, std::string> params;
 
-    JdbcConnectorParam jdbc_param;
-
-    jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
-    jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
-    jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
-    jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
-    jdbc_param.driver_class = t_jdbc_sink.jdbc_table.jdbc_driver_class;
-    jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url;
-    jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
-    jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
-    jdbc_param.table_type = t_jdbc_sink.table_type;
-    jdbc_param.query_string = t_jdbc_sink.insert_sql;
-    jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
-    jdbc_param.use_transaction = t_jdbc_sink.use_transaction;
-    jdbc_param.connection_pool_min_size = 
t_jdbc_sink.jdbc_table.connection_pool_min_size;
-    jdbc_param.connection_pool_max_size = 
t_jdbc_sink.jdbc_table.connection_pool_max_size;
-    jdbc_param.connection_pool_max_wait_time = 
t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
-    jdbc_param.connection_pool_max_life_time = 
t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
-    jdbc_param.connection_pool_keep_alive = 
t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
-
-    return jdbc_param;
+    params["jdbc_url"] = t_jdbc_sink.jdbc_table.jdbc_url;
+    params["jdbc_user"] = t_jdbc_sink.jdbc_table.jdbc_user;
+    params["jdbc_password"] = t_jdbc_sink.jdbc_table.jdbc_password;
+    params["jdbc_driver_class"] = t_jdbc_sink.jdbc_table.jdbc_driver_class;
+    params["jdbc_driver_url"] = t_jdbc_sink.jdbc_table.jdbc_driver_url;
+    params["jdbc_driver_checksum"] = 
t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
+    params["insert_sql"] = t_jdbc_sink.insert_sql;
+    params["use_transaction"] = t_jdbc_sink.use_transaction ? "true" : "false";
+    params["catalog_id"] = std::to_string(t_jdbc_sink.jdbc_table.catalog_id);
+    params["connection_pool_min_size"] =
+            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_min_size);
+    params["connection_pool_max_size"] =
+            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_size);
+    params["connection_pool_max_wait_time"] =
+            
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_wait_time);
+    params["connection_pool_max_life_time"] =
+            
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_life_time);
+    params["connection_pool_keep_alive"] =
+            t_jdbc_sink.jdbc_table.connection_pool_keep_alive ? "true" : 
"false";
+
+    return params;
 }
 
 VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs,
                                    std::shared_ptr<pipeline::Dependency> dep,
                                    std::shared_ptr<pipeline::Dependency> 
fin_dep)
         : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
-          JdbcConnector(create_connect_param(t_sink)) {}
+          _writer_params(_build_writer_params(t_sink)),
+          _use_transaction(t_sink.jdbc_table_sink.use_transaction) {}
+
+Status VJdbcTableWriter::open(RuntimeState* state, RuntimeProfile* 
operator_profile) {
+    _writer = std::make_unique<VJniFormatTransformer>(
+            state, _vec_output_expr_ctxs, 
"org/apache/doris/jdbc/JdbcJniWriter", _writer_params);
+    return _writer->open();
+}
 
 Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
-    auto num_rows = output_block.rows();
-
-    uint32_t start_send_row = 0;
-    uint32_t num_row_sent = 0;
-    while (start_send_row < num_rows) {
-        RETURN_IF_ERROR(append(&output_block, _vec_output_expr_ctxs, 
start_send_row, &num_row_sent,
-                               _conn_param.table_type));
-        start_send_row += num_row_sent;
-        num_row_sent = 0;
+
+    if (output_block.rows() == 0) {
+        return Status::OK();
+    }
+
+    return _writer->write(output_block);
+}
+
+Status VJdbcTableWriter::finish(RuntimeState* state) {
+    if (!_use_transaction || !_writer) {
+        return Status::OK();
     }
 
+    // Call commitTrans on the Java JdbcJniWriter via JNI
+    // VJniFormatTransformer manages the JNI writer object, so we use 
get_statistics
+    // to check for errors. The actual commit is done as part of close() on 
the Java side
+    // when use_transaction is true.
+    // TODO: Add explicit commitTrans JNI call support to VJniFormatTransformer

Review Comment:
   **Bug: Transaction commit is a no-op — data silently lost** — When 
`use_transaction=true`, `JdbcJniWriter.open()` calls `setAutoCommit(false)`, 
but this `finish()` method returns `Status::OK()` without ever calling 
`commitTrans()` on the Java side. The subsequent `close()` in 
`JdbcJniWriter.java` only closes the PreparedStatement and Connection — it does 
NOT commit the transaction. This means **all data written with transactions 
enabled will be silently rolled back**.
   
   This needs an explicit JNI call to `commitTrans()` before `close()`, or the 
Java-side `close()` must be updated to commit when `useTransaction` is true.
   
   The TODO comment acknowledges this but the current code will cause silent 
data loss in production.



##########
be/src/service/internal_service.cpp:
##########
@@ -979,6 +974,33 @@ void 
PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro
     }
 }
 
+// Resolve JDBC driver URL to a file:// path.
+// Handles relative paths by looking in configured/default driver directories.
+static Status _resolve_jdbc_driver_url(const std::string& url, std::string* 
result_url) {
+    if (url.find(":/") != std::string::npos) {
+        *result_url = url;
+        return Status::OK();
+    }
+    const char* doris_home = std::getenv("DORIS_HOME");

Review Comment:
   **Bug: Null pointer dereference** — `std::getenv("DORIS_HOME")` returns 
`nullptr` if the environment variable is not set. The next line passes this 
directly to `std::string(doris_home)` which invokes undefined behavior (and in 
practice crashes) when `doris_home` is null.
   
   ```cpp
   const char* doris_home = std::getenv("DORIS_HOME");
   if (doris_home == nullptr) {
       return Status::InternalError("DORIS_HOME environment variable is not 
set");
   }
   ```



##########
be/src/service/internal_service.cpp:
##########
@@ -867,11 +866,7 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
                                                               file_slots, 
io_ctx.get(), io_ctx);
             break;
         }
-        case TFileFormatType::FORMAT_AVRO: {
-            reader = vectorized::AvroJNIReader::create_unique(profile.get(), 
params, range,
-                                                              file_slots);
-            break;
-        }
+
         default:

Review Comment:
   **Regression: Avro schema discovery broken** — The `FORMAT_AVRO` case was 
removed here (along with the deletion of `avro_jni_reader.cpp/h`), but no 
replacement was added. `fetch_table_schema` for Avro files will now hit this 
`default` case and return `"Not supported file format"`, breaking Avro schema 
discovery.
   
   If Avro support is being moved to a different code path, that should be 
included in this PR. Otherwise, this case needs to be preserved or migrated to 
the new JniReader framework.



##########
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcJniWriter.java:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.VectorColumn;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * JdbcJniWriter writes C++ Block data to JDBC targets via PreparedStatement 
batch inserts.
+ * It extends JniWriter to integrate with the unified VJniFormatTransformer 
framework,
+ * following the same pattern as MaxComputeJniWriter.
+ *
+ * <p>Lifecycle (managed by C++ VJniFormatTransformer):
+ * <pre>
+ *   open() -> write() [repeated] -> close()
+ * </pre>
+ *
+ * <p>Transaction control is exposed via getStatistics() responses and
+ * additional JNI method calls from C++ side.
+ *
+ * <p>Parameters (passed via constructor params map):
+ * <ul>
+ *   <li>jdbc_url - JDBC connection URL</li>
+ *   <li>jdbc_user - database user</li>
+ *   <li>jdbc_password - database password</li>
+ *   <li>jdbc_driver_class - JDBC driver class name</li>
+ *   <li>jdbc_driver_url - path to driver JAR</li>
+ *   <li>jdbc_driver_checksum - MD5 checksum of driver JAR</li>
+ *   <li>insert_sql - INSERT SQL with ? placeholders</li>
+ *   <li>use_transaction - "true"/"false"</li>
+ *   <li>catalog_id - catalog ID for connection pool keying</li>
+ *   <li>connection_pool_min_size - min pool size</li>
+ *   <li>connection_pool_max_size - max pool size</li>
+ *   <li>connection_pool_max_wait_time - max wait time (ms)</li>
+ *   <li>connection_pool_max_life_time - max lifetime (ms)</li>
+ *   <li>connection_pool_keep_alive - "true"/"false"</li>
+ * </ul>
+ */
+public class JdbcJniWriter extends JniWriter {
+    private static final Logger LOG = Logger.getLogger(JdbcJniWriter.class);
+
+    private final String jdbcUrl;
+    private final String jdbcUser;
+    private final String jdbcPassword;
+    private final String jdbcDriverClass;
+    private final String jdbcDriverUrl;
+    private final String jdbcDriverChecksum;
+    private final String insertSql;
+    private final boolean useTransaction;
+    private final long catalogId;
+    private final int connectionPoolMinSize;
+    private final int connectionPoolMaxSize;
+    private final int connectionPoolMaxWaitTime;
+    private final int connectionPoolMaxLifeTime;
+    private final boolean connectionPoolKeepAlive;
+
+    private HikariDataSource hikariDataSource = null;
+    private Connection conn = null;
+    private PreparedStatement preparedStatement = null;
+    private ClassLoader classLoader = null;
+
+    // Statistics
+    private long writtenRows = 0;
+    private long insertTime = 0;
+
+    public JdbcJniWriter(int batchSize, Map<String, String> params) {
+        super(batchSize, params);
+        this.jdbcUrl = params.getOrDefault("jdbc_url", "");
+        this.jdbcUser = params.getOrDefault("jdbc_user", "");
+        this.jdbcPassword = params.getOrDefault("jdbc_password", "");
+        this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
+        this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
+        this.jdbcDriverChecksum = params.getOrDefault("jdbc_driver_checksum", 
"");
+        this.insertSql = params.getOrDefault("insert_sql", "");
+        this.useTransaction = 
"true".equalsIgnoreCase(params.getOrDefault("use_transaction", "false"));
+        this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", 
"0"));
+        this.connectionPoolMinSize = 
Integer.parseInt(params.getOrDefault("connection_pool_min_size", "1"));
+        this.connectionPoolMaxSize = 
Integer.parseInt(params.getOrDefault("connection_pool_max_size", "10"));
+        this.connectionPoolMaxWaitTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_wait_time", "5000"));
+        this.connectionPoolMaxLifeTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_life_time", 
"1800000"));
+        this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
+                params.getOrDefault("connection_pool_keep_alive", "false"));
+    }
+
+    @Override
+    public void open() throws IOException {
+        ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            initializeClassLoaderAndDataSource();
+            Thread.currentThread().setContextClassLoader(classLoader);
+
+            conn = hikariDataSource.getConnection();
+
+            if (useTransaction) {
+                conn.setAutoCommit(false);
+            }
+
+            LOG.info("JdbcJniWriter: Preparing insert statement: " + 
insertSql);
+            preparedStatement = conn.prepareStatement(insertSql);
+        } catch (Exception e) {
+            throw new IOException("JdbcJniWriter open failed: " + 
e.getMessage(), e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(oldClassLoader);
+        }
+    }
+
+    @Override
+    protected void writeInternal(VectorTable inputTable) throws IOException {
+        try {
+            long startInsert = System.nanoTime();
+            int numRows = inputTable.getNumRows();
+            VectorColumn[] columns = inputTable.getColumns();
+
+            for (int i = 0; i < numRows; ++i) {
+                for (int j = 0; j < columns.length; ++j) {
+                    insertColumn(i, j, columns[j]);
+                }
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            preparedStatement.clearBatch();
+
+            insertTime += System.nanoTime() - startInsert;
+            writtenRows += numRows;
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter write failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            // Commit transaction if enabled (C++ side calls commitTrans via 
JNI before close)
+            if (preparedStatement != null && !preparedStatement.isClosed()) {
+                preparedStatement.close();
+            }
+            if (conn != null && !conn.isClosed()) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter close failed: " + 
e.getMessage(), e);
+        } finally {
+            preparedStatement = null;
+            conn = null;
+            if (connectionPoolMinSize == 0 && hikariDataSource != null) {
+                hikariDataSource.close();
+                JdbcDataSource.getDataSource().getSourcesMap()
+                        .remove(createCacheKey());
+                hikariDataSource = null;
+            }
+        }
+    }
+
+    // === Transaction control methods (called by C++ via JNI) ===
+
+    public void openTrans() throws IOException {
+        try {
+            if (conn != null) {
+                conn.setAutoCommit(false);
+            }
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter openTrans failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    public void commitTrans() throws IOException {
+        try {
+            if (conn != null) {
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter commitTrans failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    public void rollbackTrans() throws IOException {
+        try {
+            if (conn != null) {
+                conn.rollback();
+            }
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter rollbackTrans failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getStatistics() {
+        Map<String, String> stats = new HashMap<>();
+        stats.put("counter:WrittenRows", String.valueOf(writtenRows));
+        stats.put("timer:InsertTime", String.valueOf(insertTime));
+        stats.put("timer:WriteTime", String.valueOf(writeTime));
+        stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+        return stats;
+    }
+
+    // =====================================================================
+    // Private helpers — adapted from BaseJdbcExecutor.insert/insertColumn
+    // =====================================================================
+
+    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
+        int parameterIndex = colIdx + 1;
+        ColumnType.Type dorisType = column.getColumnPrimitiveType();
+        if (column.isNullAt(rowIdx)) {
+            insertNullColumn(parameterIndex, dorisType);
+            return;
+        }
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
+                break;
+            case TINYINT:
+                preparedStatement.setByte(parameterIndex, 
column.getByte(rowIdx));
+                break;
+            case SMALLINT:
+                preparedStatement.setShort(parameterIndex, 
column.getShort(rowIdx));
+                break;
+            case INT:
+                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
+                break;
+            case BIGINT:
+                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
+                break;
+            case LARGEINT:
+                preparedStatement.setObject(parameterIndex, 
column.getBigInteger(rowIdx));
+                break;
+            case FLOAT:
+                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
+                break;
+            case DOUBLE:
+                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setBigDecimal(parameterIndex, 
column.getDecimal(rowIdx));
+                break;
+            case DATEV2:
+                preparedStatement.setDate(parameterIndex, 
Date.valueOf(column.getDate(rowIdx)));
+                break;
+            case DATETIMEV2:
+                preparedStatement.setTimestamp(
+                        parameterIndex, 
Timestamp.valueOf(column.getDateTime(rowIdx)));
+                break;
+            case TIMESTAMPTZ:
+                preparedStatement.setObject(
+                        parameterIndex, 
Timestamp.valueOf(column.getTimeStampTz(rowIdx)));
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                preparedStatement.setString(parameterIndex, 
column.getStringWithOffset(rowIdx));
+                break;
+            case BINARY:
+            case VARBINARY:
+                preparedStatement.setBytes(parameterIndex, 
column.getBytesVarbinary(rowIdx));
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+
+    private void insertNullColumn(int parameterIndex, ColumnType.Type 
dorisType)
+            throws SQLException {
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+                break;
+            case TINYINT:
+                preparedStatement.setNull(parameterIndex, Types.TINYINT);
+                break;
+            case SMALLINT:
+                preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+                break;
+            case INT:
+                preparedStatement.setNull(parameterIndex, Types.INTEGER);
+                break;
+            case BIGINT:
+                preparedStatement.setNull(parameterIndex, Types.BIGINT);
+                break;
+            case LARGEINT:
+                preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
+                break;
+            case FLOAT:
+                preparedStatement.setNull(parameterIndex, Types.FLOAT);
+                break;
+            case DOUBLE:
+                preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+                break;
+            case DATEV2:
+                preparedStatement.setNull(parameterIndex, Types.DATE);
+                break;
+            case DATETIMEV2:
+                preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+                break;
+            case TIMESTAMPTZ:
+                preparedStatement.setNull(parameterIndex, 
Types.TIMESTAMP_WITH_TIMEZONE);
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+                break;
+            case BINARY:
+            case VARBINARY:
+                preparedStatement.setNull(parameterIndex, Types.VARBINARY);
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+
+    private void initializeClassLoaderAndDataSource() throws Exception {
+        java.net.URL[] urls = {new java.net.URL(jdbcDriverUrl)};
+        ClassLoader parent = getClass().getClassLoader();
+        this.classLoader = java.net.URLClassLoader.newInstance(urls, parent);
+
+        String cacheKey = createCacheKey();
+        hikariDataSource = JdbcDataSource.getDataSource().getSource(cacheKey);
+        if (hikariDataSource == null) {
+            synchronized (JdbcJniWriter.class) {
+                hikariDataSource = 
JdbcDataSource.getDataSource().getSource(cacheKey);
+                if (hikariDataSource == null) {
+                    HikariDataSource ds = new HikariDataSource();
+                    ds.setDriverClassName(jdbcDriverClass);
+                    
ds.setJdbcUrl(SecurityChecker.getInstance().getSafeJdbcUrl(jdbcUrl));
+                    ds.setUsername(jdbcUser);
+                    ds.setPassword(jdbcPassword);
+                    ds.setMinimumIdle(connectionPoolMinSize);
+                    ds.setMaximumPoolSize(connectionPoolMaxSize);
+                    ds.setConnectionTimeout(connectionPoolMaxWaitTime);
+                    ds.setMaxLifetime(connectionPoolMaxLifeTime);
+                    ds.setIdleTimeout(connectionPoolMaxLifeTime / 2L);
+                    ds.setConnectionTestQuery("SELECT 1");

Review Comment:
   **Bug: Hardcoded validation query fails on some databases** — `"SELECT 1"` 
is not valid on all databases (e.g., Oracle requires `SELECT 1 FROM DUAL`, DB2 
uses `SELECT 1 FROM SYSIBM.SYSDUMMY1`).
   
   The sibling class `JdbcJniScanner` correctly handles this via 
`typeHandler.setValidationQuery(ds)`, but this class hardcodes the query. This 
should use the same `JdbcTypeHandler.setValidationQuery()` mechanism:
   ```java
   JdbcTypeHandler typeHandler = 
JdbcTypeHandlerFactory.createHandler(tableType);
   typeHandler.setValidationQuery(ds);
   ```



##########
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcJniScanner.java:
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValueConverter;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JdbcJniScanner reads data from JDBC sources via the unified JniScanner 
framework.
+ * It extends JniScanner to integrate with the JniConnector/JniReader system 
on the C++ side,
+ * following the same pattern as PaimonJniScanner, HudiJniScanner, etc.
+ *
+ * <p>This class uses the {@link JdbcTypeHandler} strategy pattern for 
database-specific
+ * type handling. The appropriate handler is selected via {@link 
JdbcTypeHandlerFactory}
+ * based on the "table_type" parameter.
+ *
+ * <p>Parameters (passed via constructor params map):
+ * <ul>
+ *   <li>jdbc_url - JDBC connection URL</li>
+ *   <li>jdbc_user - database user</li>
+ *   <li>jdbc_password - database password</li>
+ *   <li>jdbc_driver_class - JDBC driver class name</li>
+ *   <li>jdbc_driver_url - path to driver JAR</li>
+ *   <li>query_sql - the SELECT SQL to execute</li>
+ *   <li>catalog_id - catalog ID for connection pool keying</li>
+ *   <li>table_type - database type (MYSQL, ORACLE, POSTGRESQL, etc.)</li>
+ *   <li>connection_pool_min_size - min connection pool size</li>
+ *   <li>connection_pool_max_size - max connection pool size</li>
+ *   <li>connection_pool_max_wait_time - max wait time (ms)</li>
+ *   <li>connection_pool_max_life_time - max lifetime (ms)</li>
+ *   <li>connection_pool_keep_alive - "true"/"false"</li>
+ *   <li>required_fields - comma-separated output field names</li>
+ *   <li>columns_types - #-separated column type strings</li>
+ * </ul>
+ */
+public class JdbcJniScanner extends JniScanner {
+    private static final Logger LOG = Logger.getLogger(JdbcJniScanner.class);
+
+    private final String jdbcUrl;
+    private final String jdbcUser;
+    private final String jdbcPassword;
+    private final String jdbcDriverClass;
+    private final String jdbcDriverUrl;
+    private final String querySql;
+    private final long catalogId;
+    private final int connectionPoolMinSize;
+    private final int connectionPoolMaxSize;
+    private final int connectionPoolMaxWaitTime;
+    private final int connectionPoolMaxLifeTime;
+    private final boolean connectionPoolKeepAlive;
+
+    // Database-specific type handling strategy
+    private final JdbcTypeHandler typeHandler;
+    // Per-column output converters, initialized once per scan
+    private ColumnValueConverter[] outputConverters;
+
+    private HikariDataSource hikariDataSource = null;
+    private Connection conn = null;
+    private PreparedStatement stmt = null;
+    private ResultSet resultSet = null;
+    private ResultSetMetaData resultSetMetaData = null;
+    private ClassLoader classLoader = null;
+
+    // Read state
+    private boolean resultSetOpened = false;
+    private List<Object[]> block = null;
+
+    // Statistics
+    private long readRows = 0;
+    private long readTime = 0;
+
+    public JdbcJniScanner(int batchSize, Map<String, String> params) {
+        this.jdbcUrl = params.getOrDefault("jdbc_url", "");
+        this.jdbcUser = params.getOrDefault("jdbc_user", "");
+        this.jdbcPassword = params.getOrDefault("jdbc_password", "");
+        this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
+        this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
+        this.querySql = params.getOrDefault("query_sql", "");
+        this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", 
"0"));
+        this.connectionPoolMinSize = Integer.parseInt(
+                params.getOrDefault("connection_pool_min_size", "1"));
+        this.connectionPoolMaxSize = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_size", "10"));
+        this.connectionPoolMaxWaitTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_wait_time", "5000"));
+        this.connectionPoolMaxLifeTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_life_time", 
"1800000"));
+        this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
+                params.getOrDefault("connection_pool_keep_alive", "false"));
+
+        // Select database-specific type handler
+        String tableType = params.getOrDefault("table_type", "");
+        this.typeHandler = JdbcTypeHandlerFactory.create(tableType);
+
+        String requiredFields = params.getOrDefault("required_fields", "");
+        String columnsTypes = params.getOrDefault("columns_types", "");
+        String[] fieldArr = requiredFields.isEmpty() ? new String[0] : 
requiredFields.split(",");
+        ColumnType[] typeArr;
+        if (columnsTypes.isEmpty()) {
+            typeArr = new ColumnType[0];
+        } else {
+            String[] typeStrs = columnsTypes.split("#");
+            typeArr = new ColumnType[typeStrs.length];
+            for (int i = 0; i < typeStrs.length; i++) {
+                typeArr[i] = ColumnType.parseType(fieldArr[i], typeStrs[i]);
+            }
+        }
+        initTableInfo(typeArr, fieldArr, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            // Set JVM-level driver properties
+            typeHandler.setSystemProperties();
+
+            initializeClassLoaderAndDataSource();
+            Thread.currentThread().setContextClassLoader(classLoader);
+
+            conn = hikariDataSource.getConnection();
+
+            // Use type handler to create the statement with database-specific 
settings
+            stmt = typeHandler.initializeStatement(conn, querySql, batchSize);
+
+            LOG.info("JdbcJniScanner: Executing query: " + querySql);

Review Comment:
   **Issue: Query SQL logged at INFO level** — This logs every executed query 
at INFO level, which can:
   1. Expose sensitive data in queries (WHERE clauses with PII, etc.)
   2. Create excessive log volume in production with high query rates
   
   This should be `LOG.debug()` or at minimum `LOG.info()` with the SQL 
truncated/redacted.



##########
be/src/vec/exec/format/jni_reader.cpp:
##########
@@ -36,36 +43,336 @@ class Block;
 
 namespace doris::vectorized {
 
-MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                             RuntimeState* state, RuntimeProfile* profile)
-        : JniReader(file_slot_descs, state, profile) {
-    std::ostringstream required_fields;
-    std::ostringstream columns_types;
-    std::vector<std::string> column_names;
-    int index = 0;
-    for (const auto& desc : _file_slot_descs) {
-        std::string field = desc->col_name();
-        std::string type = 
JniConnector::get_jni_type_with_different_string(desc->type());
-        column_names.emplace_back(field);
-        if (index == 0) {
-            required_fields << field;
-            columns_types << type;
-        } else {
-            required_fields << "," << field;
-            columns_types << "#" << type;
+// =========================================================================
+// JniReader constructors
+// =========================================================================
+
+JniReader::JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
+                     RuntimeProfile* profile, std::string connector_class,
+                     std::map<std::string, std::string> scanner_params,
+                     std::vector<std::string> column_names, int64_t 
self_split_weight)
+        : _file_slot_descs(file_slot_descs),
+          _state(state),
+          _profile(profile),
+          _connector_class(std::move(connector_class)),
+          _scanner_params(std::move(scanner_params)),
+          _column_names(std::move(column_names)),
+          _self_split_weight(static_cast<int32_t>(self_split_weight)) {
+    _connector_name = split(_connector_class, "/").back();
+}
+
+JniReader::JniReader(std::string connector_class, std::map<std::string, 
std::string> scanner_params)
+        : _file_slot_descs(*(new std::vector<SlotDescriptor*>())),

Review Comment:
   **Bug: Memory leak** — This constructor heap-allocates a `std::vector` via 
`new` and stores it as a `const` reference (`_file_slot_descs`), but the 
allocated memory is **never freed**. Every call to this constructor (used by 
Avro schema discovery and `JdbcConnectionTester` paths) leaks one `std::vector` 
object.
   
   Suggested fix: Use an empty static vector or an owned member:
   ```cpp
   // Option 1: static empty vector (if _file_slot_descs is truly unused)
   static const std::vector<SlotDescriptor*> kEmptySlots;
   JniReader::JniReader(std::string connector_class, std::map<std::string, 
std::string> scanner_params)
           : _file_slot_descs(kEmptySlots), ...
   
   // Option 2: Change _file_slot_descs from const& to value type
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to