morningman commented on code in PR #61141:
URL: https://github.com/apache/doris/pull/61141#discussion_r2933228842


##########
be/src/exec/scan/jdbc_scanner.cpp:
##########
@@ -45,16 +43,48 @@ JdbcScanner::JdbcScanner(RuntimeState* state, 
doris::JDBCScanLocalState* local_s
           _tuple_desc(nullptr),
           _table_type(table_type),
           _is_tvf(is_tvf) {
-    _init_profile(local_state->_scanner_profile);
     _has_prepared = false;
 }
 
+std::map<std::string, std::string> JdbcScanner::_build_jdbc_params(
+        const TupleDescriptor* tuple_desc) {
+    const JdbcTableDescriptor* jdbc_table =
+            static_cast<const JdbcTableDescriptor*>(tuple_desc->table_desc());
+
+    std::map<std::string, std::string> params;
+    params["jdbc_url"] = jdbc_table->jdbc_url();
+    params["jdbc_user"] = jdbc_table->jdbc_user();
+    params["jdbc_password"] = jdbc_table->jdbc_passwd();
+    params["jdbc_driver_class"] = jdbc_table->jdbc_driver_class();
+    // Resolve jdbc_driver_url to absolute file:// URL
+    // FE sends just the JAR filename; we need to resolve it to a full path.
+    std::string driver_url;
+    auto resolve_st = 
JdbcUtils::resolve_driver_url(jdbc_table->jdbc_driver_url(), &driver_url);
+    if (!resolve_st.ok()) {
+        LOG(WARNING) << "Failed to resolve JDBC driver URL: " << 
resolve_st.to_string();
+        driver_url = jdbc_table->jdbc_driver_url();
+    }
+    params["jdbc_driver_url"] = driver_url;
+    params["query_sql"] = _query_string;
+    params["catalog_id"] = std::to_string(jdbc_table->jdbc_catalog_id());
+    params["table_type"] = _odbc_table_type_to_string(_table_type);
+    params["connection_pool_min_size"] = 
std::to_string(jdbc_table->connection_pool_min_size());
+    params["connection_pool_max_size"] = 
std::to_string(jdbc_table->connection_pool_max_size());
+    params["connection_pool_max_wait_time"] =
+            std::to_string(jdbc_table->connection_pool_max_wait_time());
+    params["connection_pool_max_life_time"] =
+            std::to_string(jdbc_table->connection_pool_max_life_time());
+    params["connection_pool_keep_alive"] =
+            jdbc_table->connection_pool_keep_alive() ? "true" : "false";
+    return params;
+}

Review Comment:
   fixed



##########
be/src/util/jdbc_utils.cpp:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/jdbc_utils.h"
+
+#include <filesystem>
+
+#include "common/config.h"
+
+namespace doris {
+
+Status JdbcUtils::resolve_driver_url(const std::string& url, std::string* 
result_url) {
+    // Already a full URL (e.g. "file:///path/to/driver.jar" or "hdfs://...")

Review Comment:
   fixed



##########
be/src/format/table/jdbc_jni_reader.cpp:
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jdbc_jni_reader.h"
+
+#include <sstream>
+
+#include "core/block/block.h"
+#include "core/column/column_nullable.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_string.h"
+#include "core/types.h"
+#include "exprs/function/simple_function_factory.h"
+#include "format/jni/jni_data_bridge.h"
+#include "runtime/descriptors.h"
+#include "util/jdbc_utils.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+JdbcJniReader::JdbcJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             RuntimeState* state, RuntimeProfile* profile,
+                             const std::map<std::string, std::string>& 
jdbc_params)
+        : JniReader(
+                  file_slot_descs, state, profile, 
"org/apache/doris/jdbc/JdbcJniScanner",
+                  [&]() {
+                      std::ostringstream required_fields;
+                      std::ostringstream columns_types;
+                      std::ostringstream replace_string;
+                      int index = 0;
+                      for (const auto& desc : file_slot_descs) {
+                          std::string field = desc->col_name();
+                          std::string type =
+                                  
JniDataBridge::get_jni_type_with_different_string(desc->type());
+
+                          // Determine replace_string for special types
+                          // (bitmap, hll, quantile_state, jsonb)
+                          std::string replace_type = "not_replace";
+                          auto ptype = desc->type()->get_primitive_type();
+                          if (ptype == PrimitiveType::TYPE_BITMAP) {
+                              replace_type = "bitmap";

Review Comment:
   fixed



##########
fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcJniWriter.java:
##########
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jdbc;
+
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.VectorColumn;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * JdbcJniWriter writes C++ Block data to JDBC targets via PreparedStatement 
batch inserts.
+ * It extends JniWriter to integrate with the unified VJniFormatTransformer 
framework,
+ * following the same pattern as MaxComputeJniWriter.
+ *
+ * <p>Lifecycle (managed by C++ VJniFormatTransformer):
+ * <pre>
+ *   open() -> write() [repeated] -> close()
+ * </pre>
+ *
+ * <p>Transaction control is exposed via getStatistics() responses and
+ * additional JNI method calls from C++ side.
+ *
+ * <p>Parameters (passed via constructor params map):
+ * <ul>
+ *   <li>jdbc_url - JDBC connection URL</li>
+ *   <li>jdbc_user - database user</li>
+ *   <li>jdbc_password - database password</li>
+ *   <li>jdbc_driver_class - JDBC driver class name</li>
+ *   <li>jdbc_driver_url - path to driver JAR</li>
+ *   <li>jdbc_driver_checksum - MD5 checksum of driver JAR</li>
+ *   <li>insert_sql - INSERT SQL with ? placeholders</li>
+ *   <li>use_transaction - "true"/"false"</li>
+ *   <li>catalog_id - catalog ID for connection pool keying</li>
+ *   <li>connection_pool_min_size - min pool size</li>
+ *   <li>connection_pool_max_size - max pool size</li>
+ *   <li>connection_pool_max_wait_time - max wait time (ms)</li>
+ *   <li>connection_pool_max_life_time - max lifetime (ms)</li>
+ *   <li>connection_pool_keep_alive - "true"/"false"</li>
+ * </ul>
+ */
+public class JdbcJniWriter extends JniWriter {
+    private static final Logger LOG = Logger.getLogger(JdbcJniWriter.class);
+
+    private final String jdbcUrl;
+    private final String jdbcUser;
+    private final String jdbcPassword;
+    private final String jdbcDriverClass;
+    private final String jdbcDriverUrl;
+    private final String jdbcDriverChecksum;
+    private final String insertSql;
+    private final boolean useTransaction;
+    private final long catalogId;
+    private final int connectionPoolMinSize;
+    private final int connectionPoolMaxSize;
+    private final int connectionPoolMaxWaitTime;
+    private final int connectionPoolMaxLifeTime;
+    private final boolean connectionPoolKeepAlive;
+
+    private HikariDataSource hikariDataSource = null;
+    private Connection conn = null;
+    private PreparedStatement preparedStatement = null;
+    private ClassLoader classLoader = null;
+
+    // Statistics
+    private long writtenRows = 0;
+    private long insertTime = 0;
+
+    public JdbcJniWriter(int batchSize, Map<String, String> params) {
+        super(batchSize, params);
+        this.jdbcUrl = params.getOrDefault("jdbc_url", "");
+        this.jdbcUser = params.getOrDefault("jdbc_user", "");
+        this.jdbcPassword = params.getOrDefault("jdbc_password", "");
+        this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
+        this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
+        this.jdbcDriverChecksum = params.getOrDefault("jdbc_driver_checksum", 
"");
+        this.insertSql = params.getOrDefault("insert_sql", "");
+        this.useTransaction = 
"true".equalsIgnoreCase(params.getOrDefault("use_transaction", "false"));
+        this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", 
"0"));
+        this.connectionPoolMinSize = 
Integer.parseInt(params.getOrDefault("connection_pool_min_size", "1"));
+        this.connectionPoolMaxSize = 
Integer.parseInt(params.getOrDefault("connection_pool_max_size", "10"));
+        this.connectionPoolMaxWaitTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_wait_time", "5000"));
+        this.connectionPoolMaxLifeTime = Integer.parseInt(
+                params.getOrDefault("connection_pool_max_life_time", 
"1800000"));
+        this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
+                params.getOrDefault("connection_pool_keep_alive", "false"));
+    }
+
+    @Override
+    public void open() throws IOException {
+        ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            initializeClassLoaderAndDataSource();
+
+            conn = hikariDataSource.getConnection();
+
+            if (useTransaction) {
+                conn.setAutoCommit(false);
+            }
+
+            LOG.debug("JdbcJniWriter: Preparing insert statement: " + 
insertSql);
+            preparedStatement = conn.prepareStatement(insertSql);
+        } catch (Exception e) {
+            throw new IOException("JdbcJniWriter open failed: " + 
e.getMessage(), e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(oldClassLoader);
+        }
+    }
+
+    @Override
+    protected void writeInternal(VectorTable inputTable) throws IOException {
+        try {
+            long startInsert = System.nanoTime();
+            int numRows = inputTable.getNumRows();
+            VectorColumn[] columns = inputTable.getColumns();
+
+            for (int i = 0; i < numRows; ++i) {
+                for (int j = 0; j < columns.length; ++j) {
+                    insertColumn(i, j, columns[j]);
+                }
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            preparedStatement.clearBatch();
+
+            insertTime += System.nanoTime() - startInsert;
+            writtenRows += numRows;
+        } catch (SQLException e) {
+            throw new IOException("JdbcJniWriter write failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            // Commit transaction if enabled (C++ side calls commitTrans via 
JNI before close)
+            if (preparedStatement != null && !preparedStatement.isClosed()) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to