This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6e675e785e36cb772eaca2897eed0822d1e649a7 Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Sep 25 09:54:50 2024 +0800 [opt](udf-cache) cache more udf classes (#40404) In this PR #34980, we cache the class load of java udf. But when calling init() method, it is still very time consuming when calling `Class.forName()`. This PR add a new class `UdfClassCache` to cache more things so that the `init()` method will not cost too much time. --- .../doris/common/classloader/ScannerLoader.java | 11 +- .../doris/common/jni/utils/UdfClassCache.java | 41 +++++ .../java/org/apache/doris/udf/UdfExecutor.java | 172 ++++++++++++--------- fe/fe-common/pom.xml | 4 + .../nereids_p0/javaudf/test_javaudf_string.groovy | 28 ++++ 5 files changed, 181 insertions(+), 75 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 2bbbd21838c..0fb9cfd6d12 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -18,6 +18,7 @@ package org.apache.doris.common.classloader; import org.apache.doris.common.jni.utils.ExpiringMap; +import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; import org.apache.log4j.Logger; @@ -45,7 +46,7 @@ import java.util.stream.Collectors; public class ScannerLoader { public static final Logger LOG = Logger.getLogger(ScannerLoader.class); private static final Map<String, Class<?>> loadedClasses = new HashMap<>(); - private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = new ExpiringMap<String, ClassLoader>(); + private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = new ExpiringMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -65,14 +66,14 @@ public class ScannerLoader { }); } - public static ClassLoader getUdfClassLoader(String functionSignature) { + public static UdfClassCache getUdfClassLoader(String functionSignature) { return udfLoadedClasses.get(functionSignature); } - public static synchronized void cacheClassLoader(String functionSignature, ClassLoader classLoader, + public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { - LOG.info("cacheClassLoader for: " + functionSignature); - udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 60 * 1000L); + LOG.info("Cache UDF for: " + functionSignature); + udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L); } public synchronized void cleanUdfClassLoader(String functionSignature) { diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java new file mode 100644 index 00000000000..4515e705214 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.utils; + +import com.esotericsoftware.reflectasm.MethodAccess; + +import java.lang.reflect.Method; + +/** + * This class is used for caching the class of UDF. + */ +public class UdfClassCache { + public Class<?> udfClass; + // the index of evaluate() method in the class + public MethodAccess methodAccess; + public int evaluateIndex; + // the method of evaluate() in udf + public Method method; + // the method of prepare() in udf + public Method prepareMethod; + // the argument and return's JavaUdfDataType of evaluate() method. + public JavaUdfDataType[] argTypes; + public JavaUdfDataType retType; + // the class type of the arguments in evaluate() method + public Class[] argClass; +} diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index ef120b2b913..685e20de843 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -20,8 +20,10 @@ package org.apache.doris.udf; import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; import org.apache.doris.common.classloader.ScannerLoader; +import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; +import org.apache.doris.common.jni.utils.UdfClassCache; import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorTable; @@ -29,6 +31,7 @@ import org.apache.doris.thrift.TJavaUdfExecutorCtorParams; import com.esotericsoftware.reflectasm.MethodAccess; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.log4j.Logger; @@ -140,26 +143,99 @@ public class UdfExecutor extends BaseExecutor { return null; // Method not found } - public ClassLoader getClassLoader(String jarPath, String signature, long expirationTime) - throws MalformedURLException, FileNotFoundException { - ClassLoader loader = null; - if (jarPath == null) { - // for test - loader = ClassLoader.getSystemClassLoader(); - } else { - if (isStaticLoad) { - loader = ScannerLoader.getUdfClassLoader(signature); - } - if (loader == null) { + public UdfClassCache getClassCache(String className, String jarPath, String signature, long expirationTime, + Type funcRetType, Type... parameterTypes) + throws MalformedURLException, FileNotFoundException, ClassNotFoundException, InternalException, + UdfRuntimeException { + UdfClassCache cache = null; + if (isStaticLoad) { + cache = ScannerLoader.getUdfClassLoader(signature); + } + if (cache == null) { + ClassLoader loader; + if (Strings.isNullOrEmpty(jarPath)) { + // if jarPath is empty, which means the UDF jar is located in custom_lib + // and already be loaded when BE start. + // so here we use system class loader to load UDF class. + loader = ClassLoader.getSystemClassLoader(); + } else { ClassLoader parent = getClass().getClassLoader(); classLoader = UdfUtils.getClassLoader(jarPath, parent); loader = classLoader; - if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, loader, expirationTime); + } + cache = new UdfClassCache(); + cache.udfClass = Class.forName(className, true, loader); + cache.methodAccess = MethodAccess.get(cache.udfClass); + checkAndCacheUdfClass(className, cache, funcRetType, parameterTypes); + if (isStaticLoad) { + ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + } + } + return cache; + } + + private void checkAndCacheUdfClass(String className, UdfClassCache cache, Type funcRetType, Type... parameterTypes) + throws InternalException, UdfRuntimeException { + ArrayList<String> signatures = Lists.newArrayList(); + Class<?> c = cache.udfClass; + Method[] methods = c.getMethods(); + Method prepareMethod = findPrepareMethod(methods); + if (prepareMethod != null) { + cache.prepareMethod = prepareMethod; + } + for (Method m : methods) { + // By convention, the udf must contain the function "evaluate" + if (!m.getName().equals(UDF_FUNCTION_NAME)) { + continue; + } + signatures.add(m.toGenericString()); + cache.argClass = m.getParameterTypes(); + + // Try to match the arguments + if (cache.argClass.length != parameterTypes.length) { + continue; + } + cache.method = m; + cache.evaluateIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass); + Pair<Boolean, JavaUdfDataType> returnType; + if (cache.argClass.length == 0 && parameterTypes.length == 0) { + // Special case where the UDF doesn't take any input args + returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); + if (!returnType.first) { + continue; + } else { + cache.retType = returnType.second; } + cache.argTypes = new JavaUdfDataType[0]; + return; + } + returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); + if (!returnType.first) { + continue; + } else { + cache.retType = returnType.second; } + Type keyType = cache.retType.getKeyType(); + Type valueType = cache.retType.getValueType(); + Pair<Boolean, JavaUdfDataType[]> inputType = UdfUtils.setArgTypes(parameterTypes, cache.argClass, false); + if (!inputType.first) { + continue; + } else { + cache.argTypes = inputType.second; + } + cache.retType.setKeyType(keyType); + cache.retType.setValueType(valueType); + return; } - return loader; + StringBuilder sb = new StringBuilder(); + sb.append("Unable to find evaluate function with the correct signature: ") + .append(className) + .append(".evaluate(") + .append(Joiner.on(", ").join(parameterTypes)) + .append(")\n") + .append("UDF contains: \n ") + .append(Joiner.on("\n ").join(signatures)); + throw new UdfRuntimeException(sb.toString()); } // Preallocate the input objects that will be passed to the underlying UDF. @@ -168,7 +244,6 @@ public class UdfExecutor extends BaseExecutor { protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { String className = request.fn.scalar_fn.symbol; - ArrayList<String> signatures = Lists.newArrayList(); try { if (LOG.isDebugEnabled()) { LOG.debug("Loading UDF '" + className + "' from " + jarPath); @@ -178,66 +253,21 @@ public class UdfExecutor extends BaseExecutor { if (request.getFn().isSetExpirationTime()) { expirationTime = request.getFn().getExpirationTime(); } - ClassLoader loader = getClassLoader(jarPath, request.getFn().getSignature(), expirationTime); - Class<?> c = Class.forName(className, true, loader); - methodAccess = MethodAccess.get(c); - Constructor<?> ctor = c.getConstructor(); + UdfClassCache cache = getClassCache(className, jarPath, request.getFn().getSignature(), expirationTime, + funcRetType, parameterTypes); + methodAccess = cache.methodAccess; + Constructor<?> ctor = cache.udfClass.getConstructor(); udf = ctor.newInstance(); - Method[] methods = c.getMethods(); - Method prepareMethod = findPrepareMethod(methods); + Method prepareMethod = cache.prepareMethod; if (prepareMethod != null) { prepareMethod.invoke(udf); } - for (Method m : methods) { - // By convention, the udf must contain the function "evaluate" - if (!m.getName().equals(UDF_FUNCTION_NAME)) { - continue; - } - signatures.add(m.toGenericString()); - argClass = m.getParameterTypes(); - - // Try to match the arguments - if (argClass.length != parameterTypes.length) { - continue; - } - method = m; - evaluateIndex = methodAccess.getIndex(UDF_FUNCTION_NAME, argClass); - Pair<Boolean, JavaUdfDataType> returnType; - if (argClass.length == 0 && parameterTypes.length == 0) { - // Special case where the UDF doesn't take any input args - returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); - if (!returnType.first) { - continue; - } else { - retType = returnType.second; - } - argTypes = new JavaUdfDataType[0]; - return; - } - returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); - if (!returnType.first) { - continue; - } else { - retType = returnType.second; - } - Pair<Boolean, JavaUdfDataType[]> inputType = UdfUtils.setArgTypes(parameterTypes, argClass, false); - if (!inputType.first) { - continue; - } else { - argTypes = inputType.second; - } - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Unable to find evaluate function with the correct signature: ") - .append(className) - .append(".evaluate(") - .append(Joiner.on(", ").join(parameterTypes)) - .append(")\n") - .append("UDF contains: \n ") - .append(Joiner.on("\n ").join(signatures)); - throw new UdfRuntimeException(sb.toString()); + argClass = cache.argClass; + method = cache.method; + evaluateIndex = cache.evaluateIndex; + retType = cache.retType; + argTypes = cache.argTypes; } catch (MalformedURLException e) { throw new UdfRuntimeException("Unable to load jar.", e); } catch (SecurityException e) { @@ -255,3 +285,5 @@ public class UdfExecutor extends BaseExecutor { } } } + + diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 458cb6ff9e4..eb6b673238e 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -132,6 +132,10 @@ under the License. <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + </dependency> </dependencies> <build> <finalName>doris-fe-common</finalName> diff --git a/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy b/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy index 4d1ecd4c109..d88746f14f3 100644 --- a/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy +++ b/regression-test/suites/nereids_p0/javaudf/test_javaudf_string.groovy @@ -71,6 +71,34 @@ suite("nereids_test_javaudf_string") { qt_select """ SELECT java_udf_string_test(string_col, 2, 3) result FROM ${tableName} ORDER BY result; """ qt_select """ SELECT java_udf_string_test('abcdef', 2, 3), java_udf_string_test('abcdefg', 2, 3) result FROM ${tableName} ORDER BY result; """ + // test multi thread + Thread thread1 = new Thread(() -> { + try { + for (int ii = 0; ii < 100; ii++) { + sql """ SELECT java_udf_string_test(varchar_col, 2, 3) result FROM ${tableName} ORDER BY result; """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + + Thread thread2 = new Thread(() -> { + try { + for (int ii = 0; ii < 100; ii++) { + sql """ SELECT java_udf_string_test(string_col, 2, 3) result FROM ${tableName} ORDER BY result; """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + sleep(1000L) + thread1.start() + thread2.start() + + thread1.join() + thread2.join() } finally { try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org