This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6e675e785e36cb772eaca2897eed0822d1e649a7
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Wed Sep 25 09:54:50 2024 +0800

    [opt](udf-cache) cache more udf classes (#40404)
    
    In this PR #34980, we cache the class load of java udf.
    But when calling init() method, it is still very time consuming
    when calling `Class.forName()`.
    
    This PR add a new class `UdfClassCache` to cache more things
    so that the `init()` method will not cost too much time.
---
 .../doris/common/classloader/ScannerLoader.java    |  11 +-
 .../doris/common/jni/utils/UdfClassCache.java      |  41 +++++
 .../java/org/apache/doris/udf/UdfExecutor.java     | 172 ++++++++++++---------
 fe/fe-common/pom.xml                               |   4 +
 .../nereids_p0/javaudf/test_javaudf_string.groovy  |  28 ++++
 5 files changed, 181 insertions(+), 75 deletions(-)

diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 2bbbd21838c..0fb9cfd6d12 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.classloader;
 
 import org.apache.doris.common.jni.utils.ExpiringMap;
+import org.apache.doris.common.jni.utils.UdfClassCache;
 
 import com.google.common.collect.Streams;
 import org.apache.log4j.Logger;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
 public class ScannerLoader {
     public static final Logger LOG = Logger.getLogger(ScannerLoader.class);
     private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
-    private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = 
new ExpiringMap<String, ClassLoader>();
+    private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = 
new ExpiringMap<>();
     private static final String CLASS_SUFFIX = ".class";
     private static final String LOAD_PACKAGE = "org.apache.doris";
 
@@ -65,14 +66,14 @@ public class ScannerLoader {
         });
     }
 
-    public static ClassLoader getUdfClassLoader(String functionSignature) {
+    public static UdfClassCache getUdfClassLoader(String functionSignature) {
         return udfLoadedClasses.get(functionSignature);
     }
 
-    public static synchronized void cacheClassLoader(String functionSignature, 
ClassLoader classLoader,
+    public static synchronized void cacheClassLoader(String functionSignature, 
UdfClassCache classCache,
             long expirationTime) {
-        LOG.info("cacheClassLoader for: " + functionSignature);
-        udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 
60 * 1000L);
+        LOG.info("Cache UDF for: " + functionSignature);
+        udfLoadedClasses.put(functionSignature, classCache, expirationTime * 
60 * 1000L);
     }
 
     public synchronized void cleanUdfClassLoader(String functionSignature) {
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
new file mode 100644
index 00000000000..4515e705214
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jni.utils;
+
+import com.esotericsoftware.reflectasm.MethodAccess;
+
+import java.lang.reflect.Method;
+
+/**
+ * This class is used for caching the class of UDF.
+ */
+public class UdfClassCache {
+    public Class<?> udfClass;
+    // the index of evaluate() method in the class
+    public MethodAccess methodAccess;
+    public int evaluateIndex;
+    // the method of evaluate() in udf
+    public Method method;
+    // the method of prepare() in udf
+    public Method prepareMethod;
+    // the argument and return's JavaUdfDataType of evaluate() method.
+    public JavaUdfDataType[] argTypes;
+    public JavaUdfDataType retType;
+    // the class type of the arguments in evaluate() method
+    public Class[] argClass;
+}
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
index ef120b2b913..685e20de843 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
@@ -20,8 +20,10 @@ package org.apache.doris.udf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.classloader.ScannerLoader;
+import org.apache.doris.common.exception.InternalException;
 import org.apache.doris.common.exception.UdfRuntimeException;
 import org.apache.doris.common.jni.utils.JavaUdfDataType;
+import org.apache.doris.common.jni.utils.UdfClassCache;
 import org.apache.doris.common.jni.utils.UdfUtils;
 import org.apache.doris.common.jni.vec.ColumnValueConverter;
 import org.apache.doris.common.jni.vec.VectorTable;
@@ -29,6 +31,7 @@ import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
 
 import com.esotericsoftware.reflectasm.MethodAccess;
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.log4j.Logger;
 
@@ -140,26 +143,99 @@ public class UdfExecutor extends BaseExecutor {
         return null; // Method not found
     }
 
-    public ClassLoader getClassLoader(String jarPath, String signature, long 
expirationTime)
-            throws MalformedURLException, FileNotFoundException {
-        ClassLoader loader = null;
-        if (jarPath == null) {
-            // for test
-            loader = ClassLoader.getSystemClassLoader();
-        } else {
-            if (isStaticLoad) {
-                loader = ScannerLoader.getUdfClassLoader(signature);
-            }
-            if (loader == null) {
+    public UdfClassCache getClassCache(String className, String jarPath, 
String signature, long expirationTime,
+            Type funcRetType, Type... parameterTypes)
+            throws MalformedURLException, FileNotFoundException, 
ClassNotFoundException, InternalException,
+            UdfRuntimeException {
+        UdfClassCache cache = null;
+        if (isStaticLoad) {
+            cache = ScannerLoader.getUdfClassLoader(signature);
+        }
+        if (cache == null) {
+            ClassLoader loader;
+            if (Strings.isNullOrEmpty(jarPath)) {
+                // if jarPath is empty, which means the UDF jar is located in 
custom_lib
+                // and already be loaded when BE start.
+                // so here we use system class loader to load UDF class.
+                loader = ClassLoader.getSystemClassLoader();
+            } else {
                 ClassLoader parent = getClass().getClassLoader();
                 classLoader = UdfUtils.getClassLoader(jarPath, parent);
                 loader = classLoader;
-                if (isStaticLoad) {
-                    ScannerLoader.cacheClassLoader(signature, loader, 
expirationTime);
+            }
+            cache = new UdfClassCache();
+            cache.udfClass = Class.forName(className, true, loader);
+            cache.methodAccess = MethodAccess.get(cache.udfClass);
+            checkAndCacheUdfClass(className, cache, funcRetType, 
parameterTypes);
+            if (isStaticLoad) {
+                ScannerLoader.cacheClassLoader(signature, cache, 
expirationTime);
+            }
+        }
+        return cache;
+    }
+
+    private void checkAndCacheUdfClass(String className, UdfClassCache cache, 
Type funcRetType, Type... parameterTypes)
+            throws InternalException, UdfRuntimeException {
+        ArrayList<String> signatures = Lists.newArrayList();
+        Class<?> c = cache.udfClass;
+        Method[] methods = c.getMethods();
+        Method prepareMethod = findPrepareMethod(methods);
+        if (prepareMethod != null) {
+            cache.prepareMethod = prepareMethod;
+        }
+        for (Method m : methods) {
+            // By convention, the udf must contain the function "evaluate"
+            if (!m.getName().equals(UDF_FUNCTION_NAME)) {
+                continue;
+            }
+            signatures.add(m.toGenericString());
+            cache.argClass = m.getParameterTypes();
+
+            // Try to match the arguments
+            if (cache.argClass.length != parameterTypes.length) {
+                continue;
+            }
+            cache.method = m;
+            cache.evaluateIndex = 
cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass);
+            Pair<Boolean, JavaUdfDataType> returnType;
+            if (cache.argClass.length == 0 && parameterTypes.length == 0) {
+                // Special case where the UDF doesn't take any input args
+                returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
+                if (!returnType.first) {
+                    continue;
+                } else {
+                    cache.retType = returnType.second;
                 }
+                cache.argTypes = new JavaUdfDataType[0];
+                return;
+            }
+            returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
+            if (!returnType.first) {
+                continue;
+            } else {
+                cache.retType = returnType.second;
             }
+            Type keyType = cache.retType.getKeyType();
+            Type valueType = cache.retType.getValueType();
+            Pair<Boolean, JavaUdfDataType[]> inputType = 
UdfUtils.setArgTypes(parameterTypes, cache.argClass, false);
+            if (!inputType.first) {
+                continue;
+            } else {
+                cache.argTypes = inputType.second;
+            }
+            cache.retType.setKeyType(keyType);
+            cache.retType.setValueType(valueType);
+            return;
         }
-        return loader;
+        StringBuilder sb = new StringBuilder();
+        sb.append("Unable to find evaluate function with the correct 
signature: ")
+                         .append(className)
+                         .append(".evaluate(")
+                         .append(Joiner.on(", ").join(parameterTypes))
+                         .append(")\n")
+                         .append("UDF contains: \n    ")
+                         .append(Joiner.on("\n    ").join(signatures));
+        throw new UdfRuntimeException(sb.toString());
     }
 
     // Preallocate the input objects that will be passed to the underlying UDF.
@@ -168,7 +244,6 @@ public class UdfExecutor extends BaseExecutor {
     protected void init(TJavaUdfExecutorCtorParams request, String jarPath, 
Type funcRetType,
             Type... parameterTypes) throws UdfRuntimeException {
         String className = request.fn.scalar_fn.symbol;
-        ArrayList<String> signatures = Lists.newArrayList();
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Loading UDF '" + className + "' from " + jarPath);
@@ -178,66 +253,21 @@ public class UdfExecutor extends BaseExecutor {
             if (request.getFn().isSetExpirationTime()) {
                 expirationTime = request.getFn().getExpirationTime();
             }
-            ClassLoader loader = getClassLoader(jarPath, 
request.getFn().getSignature(), expirationTime);
-            Class<?> c = Class.forName(className, true, loader);
-            methodAccess = MethodAccess.get(c);
-            Constructor<?> ctor = c.getConstructor();
+            UdfClassCache cache = getClassCache(className, jarPath, 
request.getFn().getSignature(), expirationTime,
+                    funcRetType, parameterTypes);
+            methodAccess = cache.methodAccess;
+            Constructor<?> ctor = cache.udfClass.getConstructor();
             udf = ctor.newInstance();
-            Method[] methods = c.getMethods();
-            Method prepareMethod = findPrepareMethod(methods);
+            Method prepareMethod = cache.prepareMethod;
             if (prepareMethod != null) {
                 prepareMethod.invoke(udf);
             }
-            for (Method m : methods) {
-                // By convention, the udf must contain the function "evaluate"
-                if (!m.getName().equals(UDF_FUNCTION_NAME)) {
-                    continue;
-                }
-                signatures.add(m.toGenericString());
-                argClass = m.getParameterTypes();
-
-                // Try to match the arguments
-                if (argClass.length != parameterTypes.length) {
-                    continue;
-                }
-                method = m;
-                evaluateIndex = methodAccess.getIndex(UDF_FUNCTION_NAME, 
argClass);
-                Pair<Boolean, JavaUdfDataType> returnType;
-                if (argClass.length == 0 && parameterTypes.length == 0) {
-                    // Special case where the UDF doesn't take any input args
-                    returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
-                    if (!returnType.first) {
-                        continue;
-                    } else {
-                        retType = returnType.second;
-                    }
-                    argTypes = new JavaUdfDataType[0];
-                    return;
-                }
-                returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
-                if (!returnType.first) {
-                    continue;
-                } else {
-                    retType = returnType.second;
-                }
-                Pair<Boolean, JavaUdfDataType[]> inputType = 
UdfUtils.setArgTypes(parameterTypes, argClass, false);
-                if (!inputType.first) {
-                    continue;
-                } else {
-                    argTypes = inputType.second;
-                }
-                return;
-            }
 
-            StringBuilder sb = new StringBuilder();
-            sb.append("Unable to find evaluate function with the correct 
signature: ")
-                    .append(className)
-                    .append(".evaluate(")
-                    .append(Joiner.on(", ").join(parameterTypes))
-                    .append(")\n")
-                    .append("UDF contains: \n    ")
-                    .append(Joiner.on("\n    ").join(signatures));
-            throw new UdfRuntimeException(sb.toString());
+            argClass = cache.argClass;
+            method = cache.method;
+            evaluateIndex = cache.evaluateIndex;
+            retType = cache.retType;
+            argTypes = cache.argTypes;
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("Unable to load jar.", e);
         } catch (SecurityException e) {
@@ -255,3 +285,5 @@ public class UdfExecutor extends BaseExecutor {
         }
     }
 }
+
+
diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml
index 458cb6ff9e4..eb6b673238e 100644
--- a/fe/fe-common/pom.xml
+++ b/fe/fe-common/pom.xml
@@ -132,6 +132,10 @@ under the License.
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <finalName>doris-fe-common</finalName>
diff --git 
a/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy 
b/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy
index 4d1ecd4c109..d88746f14f3 100644
--- a/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy
+++ b/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy
@@ -71,6 +71,34 @@ suite("nereids_test_javaudf_string") {
         qt_select """ SELECT java_udf_string_test(string_col, 2, 3)  result 
FROM ${tableName} ORDER BY result; """
         qt_select """ SELECT java_udf_string_test('abcdef', 2, 3), 
java_udf_string_test('abcdefg', 2, 3) result FROM ${tableName} ORDER BY result; 
"""
 
+        // test multi thread
+        Thread thread1 = new Thread(() -> {
+            try {
+                for (int ii = 0; ii < 100; ii++) {
+                    sql """  SELECT java_udf_string_test(varchar_col, 2, 3) 
result FROM ${tableName} ORDER BY result; """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+
+        Thread thread2 = new Thread(() -> {
+            try {
+                for (int ii = 0; ii < 100; ii++) {
+                    sql """ SELECT java_udf_string_test(string_col, 2, 3)  
result FROM ${tableName} ORDER BY result; """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+        sleep(1000L)
+        thread1.start()
+        thread2.start()
+
+        thread1.join()
+        thread2.join()
         
     } finally {
         try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, 
int);")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to