This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 469bc77a429 branch-2.1: [fix](jdbc catalog) Change BE jdbc Driver 
loading to Java code  (#48002)
469bc77a429 is described below

commit 469bc77a429b920116e01135ab7f2f8946f8acff
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Fri Feb 21 17:39:26 2025 +0800

    branch-2.1: [fix](jdbc catalog) Change BE jdbc Driver loading to Java code  
(#48002)
    
    cherry-pick from (#46912)
---
 be/src/runtime/user_function_cache.cpp             | 20 ++-----
 be/src/runtime/user_function_cache.h               |  1 -
 be/src/vec/exec/vjdbc_connector.cpp                | 29 ++++-----
 be/src/vec/exec/vjdbc_connector.h                  |  2 +
 .../org/apache/doris/jdbc/BaseJdbcExecutor.java    | 70 +++++++++++++++++++++-
 .../apache/doris/jdbc/JdbcDataSourceConfig.java    | 10 ++++
 .../org/apache/doris/catalog/JdbcResource.java     |  7 ++-
 .../org/apache/doris/catalog/JdbcResourceTest.java | 51 ++++++++++++++++
 gensrc/thrift/Types.thrift                         |  1 +
 9 files changed, 153 insertions(+), 38 deletions(-)

diff --git a/be/src/runtime/user_function_cache.cpp 
b/be/src/runtime/user_function_cache.cpp
index ab9d90846ab..a5f354180b8 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -272,12 +272,10 @@ Status UserFunctionCache::_download_lib(const 
std::string& url,
         return Status::InternalError("fail to open file");
     }
 
-    std::string real_url = _get_real_url(url);
-
     Md5Digest digest;
     HttpClient client;
     int64_t file_size = 0;
-    RETURN_IF_ERROR(client.init(real_url));
+    RETURN_IF_ERROR(client.init(url));
     Status status;
     auto download_cb = [&status, &tmp_file, &fp, &digest, &file_size](const 
void* data,
                                                                       size_t 
length) {
@@ -297,11 +295,10 @@ Status UserFunctionCache::_download_lib(const 
std::string& url,
     digest.digest();
     if (!iequal(digest.hex(), entry->checksum)) {
         fmt::memory_buffer error_msg;
-        fmt::format_to(
-                error_msg,
-                " The checksum is not equal of {} ({}). The init info of first 
create entry is:"
-                "{} But download file check_sum is: {}, file_size is: {}.",
-                url, real_url, entry->debug_string(), digest.hex(), file_size);
+        fmt::format_to(error_msg,
+                       " The checksum is not equal of {}. The init info of 
first create entry is:"
+                       "{} But download file check_sum is: {}, file_size is: 
{}.",
+                       url, entry->debug_string(), digest.hex(), file_size);
         std::string error(fmt::to_string(error_msg));
         LOG(WARNING) << error;
         return Status::InternalError(error);
@@ -323,13 +320,6 @@ Status UserFunctionCache::_download_lib(const std::string& 
url,
     return Status::OK();
 }
 
-std::string UserFunctionCache::_get_real_url(const std::string& url) {
-    if (url.find(":/") == std::string::npos) {
-        return "file://" + config::jdbc_drivers_dir + "/" + url;
-    }
-    return url;
-}
-
 std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) 
const {
     std::string file_name;
     size_t last_slash_pos = url.find_last_of('/');
diff --git a/be/src/runtime/user_function_cache.h 
b/be/src/runtime/user_function_cache.h
index 5d1bff8b866..93759c261e2 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -72,7 +72,6 @@ private:
                                const std::string& file_name);
     void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);
 
-    std::string _get_real_url(const std::string& url);
     std::string _get_file_name_from_url(const std::string& url) const;
     std::vector<std::string> _split_string_by_checksum(const std::string& 
file);
 
diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index 0fa33bfaad9..203e7934084 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -119,23 +119,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) 
{
     // Add a scoped cleanup jni reference object. This cleans up local refs 
made below.
     JniLocalFrame jni_frame;
     {
-        std::string local_location;
-        std::hash<std::string> hash_str;
-        auto* function_cache = UserFunctionCache::instance();
-        if (_conn_param.resource_name.empty()) {
-            // for jdbcExternalTable, _conn_param.resource_name == ""
-            // so, we use _conn_param.driver_path as key of jarpath
-            SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
-            RETURN_IF_ERROR(function_cache->get_jarpath(
-                    std::abs((int64_t)hash_str(_conn_param.driver_path)), 
_conn_param.driver_path,
-                    _conn_param.driver_checksum, &local_location));
-        } else {
-            SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
-            RETURN_IF_ERROR(function_cache->get_jarpath(
-                    std::abs((int64_t)hash_str(_conn_param.resource_name)), 
_conn_param.driver_path,
-                    _conn_param.driver_checksum, &local_location));
-        }
-        VLOG_QUERY << "driver local path = " << local_location;
+        std::string driver_path = _get_real_url(_conn_param.driver_path);
 
         TJdbcExecutorCtorParams ctor_params;
         ctor_params.__set_statement(_sql_str);
@@ -144,7 +128,8 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
         ctor_params.__set_jdbc_user(_conn_param.user);
         ctor_params.__set_jdbc_password(_conn_param.passwd);
         ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
-        ctor_params.__set_driver_path(local_location);
+        ctor_params.__set_driver_path(driver_path);
+        ctor_params.__set_jdbc_driver_checksum(_conn_param.driver_checksum);
         if (state == nullptr) {
             ctor_params.__set_batch_size(read ? 1 : 0);
         } else {
@@ -601,4 +586,12 @@ jobject JdbcConnector::_get_java_table_type(JNIEnv* env, 
TOdbcTableType::type ta
             env->CallStaticObjectMethod(enumClass, findByValueMethod, 
static_cast<jint>(tableType));
     return javaEnumObj;
 }
+
+std::string JdbcConnector::_get_real_url(const std::string& url) {
+    if (url.find(":/") == std::string::npos) {
+        return "file://" + config::jdbc_drivers_dir + "/" + url;
+    }
+    return url;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index 954b0abfa78..c23dc11c865 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -134,6 +134,8 @@ private:
                                 int rows);
     jobject _get_java_table_type(JNIEnv* env, TOdbcTableType::type tableType);
 
+    std::string _get_real_url(const std::string& url);
+
     bool _closed = false;
     jclass _executor_factory_clazz;
     jclass _executor_clazz;
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
index 56b7865dbb1..383f1fe9aa1 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.doris.jdbc;
 
 import org.apache.doris.common.exception.InternalException;
-import org.apache.doris.common.jni.utils.UdfUtils;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ColumnValueConverter;
 import org.apache.doris.common.jni.vec.VectorColumn;
@@ -27,7 +26,9 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -35,8 +36,15 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.semver4j.Semver;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Array;
 import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLConnection;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.Date;
@@ -57,6 +65,7 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
     private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new 
TBinaryProtocol.Factory();
     private HikariDataSource hikariDataSource = null;
     private final byte[] hikariDataSourceLock = new byte[0];
+    private ClassLoader classLoader = null;
     private Connection conn = null;
     protected JdbcDataSourceConfig config;
     protected PreparedStatement preparedStatement = null;
@@ -68,6 +77,7 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
     protected int batchSizeNum = 0;
     protected int curBlockRows = 0;
     protected String jdbcDriverVersion;
+    private static final Map<URL, ClassLoader> classLoaderMap = 
Maps.newConcurrentMap();
 
     public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
         setJdbcDriverSystemProperties();
@@ -85,6 +95,7 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
                 .setJdbcUrl(request.jdbc_url)
                 .setJdbcDriverUrl(request.driver_path)
                 .setJdbcDriverClass(request.jdbc_driver_class)
+                .setJdbcDriverChecksum(request.jdbc_driver_checksum)
                 .setBatchSize(request.batch_size)
                 .setOp(request.op)
                 .setTableType(request.table_type)
@@ -298,8 +309,7 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
         ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
         String hikariDataSourceKey = config.createCacheKey();
         try {
-            ClassLoader parent = getClass().getClassLoader();
-            ClassLoader classLoader = 
UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
+            initializeClassLoader(config);
             Thread.currentThread().setContextClassLoader(classLoader);
             hikariDataSource = 
JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
             if (hikariDataSource == null) {
@@ -357,6 +367,60 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
         }
     }
 
+    private synchronized void initializeClassLoader(JdbcDataSourceConfig 
config)
+            throws MalformedURLException, FileNotFoundException {
+        try {
+            URL[] urls = {new URL(config.getJdbcDriverUrl())};
+            if (classLoaderMap.containsKey(urls[0])) {
+                this.classLoader = classLoaderMap.get(urls[0]);
+            } else {
+                String expectedChecksum = config.getJdbcDriverChecksum();
+                String actualChecksum = 
computeObjectChecksum(urls[0].toString(), null);
+                if (!expectedChecksum.equals(actualChecksum)) {
+                    throw new RuntimeException("Checksum mismatch for JDBC 
driver.");
+                }
+                ClassLoader parent = getClass().getClassLoader();
+                this.classLoader = URLClassLoader.newInstance(urls, parent);
+                classLoaderMap.put(urls[0], this.classLoader);
+            }
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("Error loading JDBC driver.", e);
+        }
+    }
+
+    public static String computeObjectChecksum(String urlStr, String 
encodedAuthInfo) {
+        try (InputStream inputStream = getInputStreamFromUrl(urlStr, 
encodedAuthInfo, 10000, 10000)) {
+            MessageDigest digest = MessageDigest.getInstance("MD5");
+            byte[] buf = new byte[4096];
+            int bytesRead;
+            while ((bytesRead = inputStream.read(buf)) != -1) {
+                digest.update(buf, 0, bytesRead);
+            }
+            return Hex.encodeHexString(digest.digest());
+        } catch (IOException | NoSuchAlgorithmException e) {
+            throw new RuntimeException("Compute driver checksum from url: " + 
urlStr
+                    + " encountered an error: " + e.getMessage());
+        }
+    }
+
+    public static InputStream getInputStreamFromUrl(String urlStr, String 
encodedAuthInfo, int connectTimeoutMs,
+            int readTimeoutMs) throws IOException {
+        try {
+            URL url = new URL(urlStr);
+            URLConnection conn = url.openConnection();
+
+            if (encodedAuthInfo != null) {
+                conn.setRequestProperty("Authorization", "Basic " + 
encodedAuthInfo);
+            }
+
+            conn.setConnectTimeout(connectTimeoutMs);
+            conn.setReadTimeout(readTimeoutMs);
+            return conn.getInputStream();
+        } catch (Exception e) {
+            throw new IOException("Failed to open URL connection: " + urlStr, 
e);
+        }
+    }
+
     protected void setValidationQuery(HikariDataSource ds) {
         ds.setConnectionTestQuery("SELECT 1");
     }
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
index a99377add25..87bb9849fe0 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java
@@ -27,6 +27,7 @@ public class JdbcDataSourceConfig {
     private String jdbcPassword;
     private String jdbcDriverUrl;
     private String jdbcDriverClass;
+    private String jdbcDriverChecksum;
     private int batchSize;
     private TJdbcOperation op;
     private TOdbcTableType tableType;
@@ -96,6 +97,15 @@ public class JdbcDataSourceConfig {
         return this;
     }
 
+    public String getJdbcDriverChecksum() {
+        return jdbcDriverChecksum;
+    }
+
+    public JdbcDataSourceConfig setJdbcDriverChecksum(String 
jdbcDriverChecksum) {
+        this.jdbcDriverChecksum = jdbcDriverChecksum;
+        return this;
+    }
+
     public int getBatchSize() {
         return batchSize;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index cd078b4376a..b5c61f908e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -277,6 +277,12 @@ public class JdbcResource extends Resource {
     }
 
     public static String getFullDriverUrl(String driverUrl) throws 
IllegalArgumentException {
+        if (!(driverUrl.startsWith("file://") || 
driverUrl.startsWith("http://";)
+                || driverUrl.startsWith("https://";) || 
driverUrl.matches("^[^:/]+\\.jar$"))) {
+            throw new IllegalArgumentException("Invalid driver URL format. 
Supported formats are: "
+                    + "file://xxx.jar, http://xxx.jar, https://xxx.jar, or 
xxx.jar (without prefix).");
+        }
+
         try {
             URI uri = new URI(driverUrl);
             String schema = uri.getScheme();
@@ -481,4 +487,3 @@ public class JdbcResource extends Resource {
         }
     }
 }
-
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
index 81c2157686a..3cd8f872ce9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java
@@ -35,6 +35,7 @@ import mockit.Mocked;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
 import java.util.Map;
 
@@ -216,4 +217,54 @@ public class JdbcResourceTest {
         });
         Assert.assertEquals("Driver URL does not match any allowed paths: 
file:///postgresql-42.5.0.jar", exception.getMessage());
     }
+
+    @Test
+    public void testValidDriverUrls() {
+        String fileUrl = "file://path/to/driver.jar";
+        Assertions.assertDoesNotThrow(() -> {
+            String result = JdbcResource.getFullDriverUrl(fileUrl);
+            Assert.assertEquals(fileUrl, result);
+        });
+
+        String httpUrl = "http://example.com/driver.jar";;
+        Assertions.assertDoesNotThrow(() -> {
+            String result = JdbcResource.getFullDriverUrl(httpUrl);
+            Assert.assertEquals(httpUrl, result);
+        });
+
+        String httpsUrl = "https://example.com/driver.jar";;
+        Assertions.assertDoesNotThrow(() -> {
+            String result = JdbcResource.getFullDriverUrl(httpsUrl);
+            Assert.assertEquals(httpsUrl, result);
+        });
+
+        String jarFile = "driver.jar";
+        Assertions.assertDoesNotThrow(() -> {
+            String result = JdbcResource.getFullDriverUrl(jarFile);
+            Assert.assertTrue(result.startsWith("file://"));
+        });
+    }
+
+    @Test
+    public void testInvalidDriverUrls() {
+        String invalidUrl1 = "/mnt/path/to/driver.jar";
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            JdbcResource.getFullDriverUrl(invalidUrl1);
+        });
+
+        String invalidUrl2 = "ftp://example.com/driver.jar";;
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            JdbcResource.getFullDriverUrl(invalidUrl2);
+        });
+
+        String invalidUrl3 = "";
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            JdbcResource.getFullDriverUrl(invalidUrl3);
+        });
+
+        String invalidUrl4 = "example.com/driver";
+        Assert.assertThrows(IllegalArgumentException.class, () -> {
+            JdbcResource.getFullDriverUrl(invalidUrl4);
+        });
+    }
 }
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 89a67fdba15..336988e4b88 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -436,6 +436,7 @@ struct TJdbcExecutorCtorParams {
   14: optional i32 connection_pool_cache_clear_time
   15: optional bool connection_pool_keep_alive
   16: optional i64 catalog_id
+  17: optional string jdbc_driver_checksum
 }
 
 struct TJavaUdfExecutorCtorParams {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to