This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 61ca8bd9bc6 [refact](udf) remove the udf cache expiration_time
property (#63897)
61ca8bd9bc6 is described below
commit 61ca8bd9bc6d7513a62b69c9c6b7e89e22d175be
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Jun 2 13:01:12 2026 +0800
[refact](udf) remove the udf cache expiration_time property (#63897)
### What problem does this PR solve?
Problem Summary:
doc https://github.com/apache/doris-website/pull/3845
```
CREATE FUNCTION print_12() RETURNS int
PROPERTIES (
"file" = "file:///path/to/java-udf-demo-jar-with-dependencies.jar",
"symbol" = "org.apache.doris.udf.Print",
"always_nullable"="true",
"type" = "JAVA_UDF",
"static_load" = "true", // default value is false
"expiration_time" = "60" // default value is 360 minutes
);
```
```
before in the java-udf could use static_load and expiration_time to
control the cache jar times in BE.
which use a backgroud thread to scan the jars every ten minutes, check it's
init times, and then drop it if time expire.
those will cause some long running query failed when the backgroud thread
remove it.
Now, remove the expiration_time, and the jar will be clean when drop
fucntion immediately
```
---
.../doris/common/classloader/ScannerLoader.java | 72 +++++++++++++--
.../apache/doris/common/jni/utils/ExpiringMap.java | 100 ---------------------
.../java/org/apache/doris/udf/BaseExecutor.java | 17 +++-
3 files changed, 78 insertions(+), 111 deletions(-)
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 1c5874031d4..f8a119efaa9 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -17,7 +17,6 @@
package org.apache.doris.common.classloader;
-import org.apache.doris.common.jni.utils.ExpiringMap;
import org.apache.doris.common.jni.utils.UdfClassCache;
import com.google.common.collect.Streams;
@@ -37,6 +36,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
@@ -88,7 +88,21 @@ public class ScannerLoader {
public static final Logger LOG = LogManager.getLogger(ScannerLoader.class);
private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
- private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses =
new ExpiringMap<>();
+ // Cache of UDF class metadata (including the URLClassLoader used to load
the UDF).
+ // Entries are inserted on first use and only ever removed by an explicit
+ // cleanUdfClassLoader() call (triggered by FE on DROP FUNCTION). There is
intentionally
+ // no time-based eviction: that previously caused two issues —
+ // 1) closing a URLClassLoader while another thread was still loading
classes from it
+ // led to NoClassDefFoundError;
+ // 2) rebuilding a fresh URLClassLoader on every eviction produced
multiple coexisting
+ // ClassLoaders for the same UDF, which broke lazy class resolution
and reflective
+ // lookups inside user UDF code.
+ // NOTE: a cache miss in BaseExecutor.getClassCache() is NOT only
reachable after
+ // cleanUdfClassLoader() — concurrent first-time loads of the same
signature can also
+ // both observe a miss. cacheClassLoader() must therefore insert
atomically via
+ // putIfAbsent and must never close a cache that was already published to
the map,
+ // because another executor may already be holding it.
+ private static final Map<String, UdfClassCache> udfLoadedClasses = new
ConcurrentHashMap<>();
private static final String CLASS_SUFFIX = ".class";
private static final String LOAD_PACKAGE = "org.apache.doris";
@@ -116,15 +130,57 @@ public class ScannerLoader {
return udfLoadedClasses.get(functionSignature);
}
- public static synchronized void cacheClassLoader(String functionSignature,
UdfClassCache classCache,
+ /**
+ * Cache the UDF class metadata for the given function signature.
+ *
+ * <p>Insertion is atomic via {@link Map#putIfAbsent}: if another executor
thread has
+ * already published a cache entry for {@code functionSignature}, the
{@code classCache}
+ * argument is treated as a redundant build and closed here (it has not
yet been handed
+ * to any executor, so closing its URLClassLoader is safe). The
already-published entry
+ * is returned to the caller so the current executor can switch to it.</p>
+ *
+ * <p>The {@code expirationTime} parameter is kept for backward
compatibility with the
+ * existing call sites and DDL property {@code expiration_time}, but is no
longer used:
+ * cached entries are not evicted by time. Removal happens only via
+ * {@link #cleanUdfClassLoader(String)} on DROP FUNCTION.</p>
+ *
+ * @return the {@link UdfClassCache} actually held in the map after this
call —
+ * either {@code classCache} (we won the race) or the pre-existing
entry
+ * (another thread won; {@code classCache} has been closed and
must not be used).
+ */
+ public static UdfClassCache cacheClassLoader(String functionSignature,
UdfClassCache classCache,
long expirationTime) {
- LOG.info("Cache UDF for: {}", functionSignature);
- udfLoadedClasses.put(functionSignature, classCache, expirationTime *
60 * 1000L);
+ LOG.info("Cache UDF for: " + functionSignature);
+ UdfClassCache existing =
udfLoadedClasses.putIfAbsent(functionSignature, classCache);
+ if (existing == null) {
+ return classCache;
+ }
+ // Lost the race against a concurrent first-time load. The cache we
just built has
+ // never been exposed to any executor, so closing its URLClassLoader
here cannot
+ // affect anyone. Do NOT touch `existing` — another executor may
already be using it.
+ try {
+ classCache.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close redundant UdfClassCache for " +
functionSignature, e);
+ }
+ return existing;
}
- public synchronized void cleanUdfClassLoader(String functionSignature) {
- LOG.info("cleanUdfClassLoader for: {}", functionSignature);
- udfLoadedClasses.remove(functionSignature);
+ public void cleanUdfClassLoader(String functionSignature) {
+ LOG.info("cleanUdfClassLoader for: " + functionSignature);
+ UdfClassCache removed = udfLoadedClasses.remove(functionSignature);
+ if (removed != null) {
+ // Immediately close the URLClassLoader. NOTE: any in-flight query
still holding a
+ // reference to this cache (e.g. via JNIContext.executor) will
fail with
+ // NoClassDefFoundError on lazy class resolution after this point.
This is the
+ // accepted semantic of DROP FUNCTION: the function is gone,
queries against it
+ // are expected to fail.
+ try {
+ removed.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close UdfClassCache for " +
functionSignature, e);
+ }
+ }
}
/**
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
deleted file mode 100644
index 3496d5bbb63..00000000000
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.jni.utils;
-
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ExpiringMap<K, V> {
- private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(); //
key --> value
- private final ConcurrentHashMap<K, Long> ttlMap = new
ConcurrentHashMap<>(); // key --> ttl interval
- // key --> expirationTime(ttl interval + currentTimeMillis)
- private final ConcurrentHashMap<K, Long> expirationMap = new
ConcurrentHashMap<>();
- private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
- private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10
minutes
- public static final Logger LOG = Logger.getLogger(ExpiringMap.class);
-
- public ExpiringMap() {
- startExpirationTask();
- }
-
- public void put(K key, V value, long expirationTimeMs) {
- long expirationTime = System.currentTimeMillis() + expirationTimeMs;
- map.put(key, value);
- expirationMap.put(key, expirationTime);
- ttlMap.put(key, expirationTimeMs);
- }
-
- public V get(K key) {
- Long expirationTime = expirationMap.get(key);
- if (expirationTime == null || System.currentTimeMillis() >
expirationTime) {
- remove(key);
- return null;
- }
- // reset time again
- long ttl = ttlMap.get(key);
- long newExpirationTime = System.currentTimeMillis() + ttl;
- expirationMap.put(key, newExpirationTime);
- return map.get(key);
- }
-
- private void startExpirationTask() {
- scheduler.scheduleAtFixedRate(() -> {
- long now = System.currentTimeMillis();
- for (K key : expirationMap.keySet()) {
- if (expirationMap.get(key) <= now) {
- remove(key);
- }
- }
- }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES);
- }
-
- public void remove(K key) {
- V value = map.remove(key);
- expirationMap.remove(key);
- ttlMap.remove(key);
-
- // Uniformly release resources for any AutoCloseable value,
- if (value instanceof AutoCloseable) {
- try {
- ((AutoCloseable) value).close();
- } catch (Exception e) {
- LOG.warn("Failed to close cached resource: " + key, e);
- }
- }
- }
-
- public int size() {
- return map.size();
- }
-
- public void shutdown() {
- scheduler.shutdown();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
- scheduler.shutdownNow();
- }
- } catch (InterruptedException e) {
- scheduler.shutdownNow();
- }
- }
-}
diff --git
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 0967e9fde0b..6356576baaf 100644
---
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -138,8 +138,12 @@ public abstract class BaseExecutor {
UdfClassCache cache = null;
if (isStaticLoad) {
cache = ScannerLoader.getUdfClassLoader(signature);
- if (cache != null && cache.classLoader != null) {
- // Reuse the cached classLoader to ensure dependent classes
can be loaded
+ if (cache != null) {
+ // Reuse the cached classLoader to ensure dependent classes
can be loaded.
+ // NOTE: cache.classLoader may be null when the UDF was
originally loaded via
+ // the system class loader (jarPath empty / custom_lib UDF);
see
+ // UdfClassCache#classLoader. A null value here is a valid
cached state and
+ // must NOT trigger a rebuild — only an actual cache miss does.
classLoader = cache.classLoader;
}
}
@@ -162,7 +166,14 @@ public abstract class BaseExecutor {
cache.classLoader = classLoader;
checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
if (isStaticLoad) {
- ScannerLoader.cacheClassLoader(signature, cache,
expirationTime);
+ UdfClassCache effective =
ScannerLoader.cacheClassLoader(signature, cache, expirationTime);
+ if (effective != cache) {
+ // Another thread won the publish race. Our locally-built
cache (and its
+ // URLClassLoader) was already closed inside
cacheClassLoader(); switch to
+ // the published one so we share its live classLoader.
+ cache = effective;
+ classLoader = cache.classLoader;
+ }
}
}
return cache;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]