This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 88672993450939c756a99b2f2c2c05f02865f7a9 Author: Pratik Katti <[email protected]> AuthorDate: Wed Jun 3 02:45:34 2026 +0530 [improve][fn] make built-in functions reload incremental (#25868) (cherry picked from commit d57af8f37725ed16547139978f3a4d83022066c5) --- .../pulsar/functions/worker/FunctionsManager.java | 30 ++-- .../FunctionsManagerReloadFunctionsTest.java | 80 +++++++++++ .../functions/utils/functions/FunctionArchive.java | 28 ++++ .../functions/utils/functions/FunctionUtils.java | 77 ++++++++++ .../utils/functions/ReloadFunctionsResult.java | 29 ++++ .../utils/functions/FunctionUtilsReloadTest.java | 159 +++++++++++++++++++++ 6 files changed, 395 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java index cdd77249502..42841d1d695 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.file.Path; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -30,10 +31,11 @@ import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.utils.functions.FunctionArchive; import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.functions.ReloadFunctionsResult; @Slf4j public class FunctionsManager implements AutoCloseable { - private Map<String, FunctionArchive> functions; + private volatile Map<String, FunctionArchive> functions; @VisibleForTesting public FunctionsManager() { @@ -62,32 +64,44 @@ public class FunctionsManager implements AutoCloseable { } public void reloadFunctions(WorkerConfig workerConfig) throws IOException { - Map<String, FunctionArchive> oldFunctions = functions; - this.functions = createFunctions(workerConfig); - closeFunctions(oldFunctions); + ReloadFunctionsResult reload = FunctionUtils.reloadFunctions( + this.functions, + workerConfig.getFunctionsDirectory(), + workerConfig.getNarExtractionDirectory(), + isEnableClassloading(workerConfig)); + this.functions = reload.functions(); + closeFunctions(reload.functionsToClose()); } private static Map<String, FunctionArchive> createFunctions(WorkerConfig workerConfig) throws IOException { - boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles() - || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName()); + boolean enableClassloading = isEnableClassloading(workerConfig); return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(), workerConfig.getNarExtractionDirectory(), enableClassloading); } + private static boolean isEnableClassloading(WorkerConfig workerConfig) { + return workerConfig.getEnableClassloadingOfBuiltinFiles() + || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName()); + } + @Override public void close() { closeFunctions(functions); } - private void closeFunctions(Map<String, FunctionArchive> functionMap) { - functionMap.values().forEach(functionArchive -> { + private void closeFunctions(Collection<FunctionArchive> functions) { + functions.forEach(functionArchive -> { try { functionArchive.close(); } catch (Exception e) { log.warn("Failed to close function archive", e); } }); + } + + private void closeFunctions(Map<String, FunctionArchive> functionMap) { + closeFunctions(functionMap.values()); functionMap.clear(); } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/FunctionsManagerReloadFunctionsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/FunctionsManagerReloadFunctionsTest.java new file mode 100644 index 00000000000..0ea2ae2036a --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/FunctionsManagerReloadFunctionsTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.testng.Assert.assertSame; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.pulsar.common.functions.FunctionDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.utils.functions.FunctionArchive; +import org.testng.annotations.Test; + +/** + * Tests {@link FunctionsManager#reloadFunctions(WorkerConfig)} for incremental reload behavior, + * ensuring unchanged functions are reused instead of being recreated. + */ +public class FunctionsManagerReloadFunctionsTest { + + private static void writeMinimalNar(Path narPath, FunctionDefinition def) throws IOException { + byte[] yaml = ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def); + try (OutputStream os = Files.newOutputStream(narPath); + ZipOutputStream zos = new ZipOutputStream(os)) { + ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml"); + zos.putNextEntry(entry); + zos.write(yaml); + zos.closeEntry(); + } + } + + private static FunctionDefinition sampleDefinition(String name) { + FunctionDefinition def = new FunctionDefinition(); + def.setName(name); + def.setFunctionClass("org.example.Function"); + return def; + } + + @Test + public void reloadWhenNarUnchangedReusesSameFunctionArchiveInstance() throws Exception { + Path dir = Files.createTempDirectory("mgr-fn-reload-"); + Path nar = dir.resolve("f1.nar"); + writeMinimalNar(nar, sampleDefinition("f-one")); + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setFunctionsDirectory(dir.toString()); + workerConfig.setNarExtractionDirectory(NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); + workerConfig.setEnableClassloadingOfBuiltinFiles(false); + + try (FunctionsManager manager = new FunctionsManager(workerConfig)) { + FunctionArchive before = manager.getFunction("f-one"); + before.getFunctionPackage(); + + manager.reloadFunctions(workerConfig); + + FunctionArchive after = manager.getFunction("f-one"); + assertSame(after, before); + before.getFunctionPackage(); + } + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java index cfb213f34ed..70b452eba39 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.functions.utils.functions; +import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Path; import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.functions.utils.FunctionFilePackage; @@ -25,6 +27,8 @@ import org.apache.pulsar.functions.utils.ValidatableFunctionPackage; public class FunctionArchive implements AutoCloseable { private final Path archivePath; + /** MD5 hex of archive file contents; empty when {@link #archivePath} is null (test doubles). */ + private final String archiveMd5Hex; private final FunctionDefinition functionDefinition; private final String narExtractionDirectory; private final boolean enableClassloading; @@ -33,16 +37,40 @@ public class FunctionArchive implements AutoCloseable { public FunctionArchive(Path archivePath, FunctionDefinition functionDefinition, String narExtractionDirectory, boolean enableClassloading) { + this(archivePath, functionDefinition, narExtractionDirectory, enableClassloading, null); + } + + /** + * @param precomputedArchiveMd5Hex MD5 hex of {@code archivePath} contents; if null and path is non-null, + * the hash is computed once at construction time. + */ + public FunctionArchive(Path archivePath, FunctionDefinition functionDefinition, String narExtractionDirectory, + boolean enableClassloading, String precomputedArchiveMd5Hex) { this.archivePath = archivePath; this.functionDefinition = functionDefinition; this.narExtractionDirectory = narExtractionDirectory; this.enableClassloading = enableClassloading; + if (archivePath != null) { + try { + this.archiveMd5Hex = precomputedArchiveMd5Hex != null + ? precomputedArchiveMd5Hex + : FunctionUtils.computeArchiveMd5Hex(archivePath); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + this.archiveMd5Hex = ""; + } } public Path getArchivePath() { return archivePath; } + public String getArchiveMd5Hex() { + return archiveMd5Hex; + } + public synchronized ValidatableFunctionPackage getFunctionPackage() { if (closed) { throw new IllegalStateException("FunctionArchive is already closed"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java index f4d45edf363..1425b2985da 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java @@ -25,12 +25,16 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HexFormat; +import java.util.List; import java.util.Map; import java.util.TreeMap; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.functions.FunctionDefinition; +import org.apache.pulsar.common.nar.FileUtils; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.zeroturnaround.zip.ZipUtil; @@ -42,6 +46,17 @@ public class FunctionUtils { private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml"; + /** + * Computes MD5 digest of a file as lower-case hex (for function archive identity on reload). + */ + public static String computeArchiveMd5Hex(Path path) throws IOException { + return calculateMd5Hex(path.toAbsolutePath().normalize().toFile()); + } + + private static String calculateMd5Hex(File file) throws IOException { + return HexFormat.of().formatHex(FileUtils.calculateMd5sum(file)); + } + /** * Extract the Pulsar Function class from a function or archive. */ @@ -107,4 +122,66 @@ public class FunctionUtils { return functions; } + + /** + * Reloads functions from disk against {@code previous}, reusing {@link FunctionArchive} instances when path and + * archive MD5 are unchanged (keeps class loaders open). New or changed archives get new instances. + * <p> + * {@link ReloadFunctionsResult#functionsToClose()} lists function archives evicted from the active set (replaced + * or no longer present on disk); the caller must {@link FunctionArchive#close()} each. + * + * @param previous functions from the previous scan (may be empty, never null) + * @param functionsDirectory same semantics as {@link #searchForFunctions} + * @param narExtractionDirectory same semantics as {@link #searchForFunctions} + * @param enableClassloading same semantics as {@link #searchForFunctions} + * @return new map keyed by function name (reused values are identical instances from {@code previous}) and + * functions the caller should close + */ + public static ReloadFunctionsResult reloadFunctions( + Map<String, FunctionArchive> previous, + String functionsDirectory, + String narExtractionDirectory, + boolean enableClassloading) throws IOException { + + TreeMap<String, FunctionArchive> remaining = new TreeMap<>(previous); + TreeMap<String, FunctionArchive> next = new TreeMap<>(); + List<FunctionArchive> toClose = new ArrayList<>(); + + Path dir = Paths.get(functionsDirectory).toAbsolutePath().normalize(); + if (!dir.toFile().exists()) { + toClose.addAll(remaining.values()); + return new ReloadFunctionsResult(next, toClose); + } + + try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*.nar")) { + for (Path archive : stream) { + try { + FunctionDefinition funcDef = FunctionUtils.getFunctionDefinition(archive.toFile()); + if (!StringUtils.isEmpty(funcDef.getFunctionClass())) { + String name = funcDef.getName(); + String md5Hex = computeArchiveMd5Hex(archive); + FunctionArchive prev = remaining.remove(name); + if (prev != null + && prev.getArchivePath() != null + && archive.equals(prev.getArchivePath()) + && md5Hex.equals(prev.getArchiveMd5Hex())) { + next.put(name, prev); + } else { + if (prev != null) { + log.info("Reloading changed function {} from {} (previous archive {})", + name, archive, prev.getArchivePath()); + toClose.add(prev); + } + next.put(name, new FunctionArchive(archive, funcDef, narExtractionDirectory, + enableClassloading, md5Hex)); + } + } + } catch (Throwable t) { + log.warn("Failed to load function from {}", archive, t); + } + } + } + toClose.addAll(remaining.values()); + return new ReloadFunctionsResult(next, toClose); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ReloadFunctionsResult.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ReloadFunctionsResult.java new file mode 100644 index 00000000000..2eb8c736445 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/ReloadFunctionsResult.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import java.util.List; +import java.util.Map; + +/** + * Result of {@link FunctionUtils#reloadFunctions}: the new function map and function archives evicted from the + * active set that the caller must close. + */ +public record ReloadFunctionsResult(Map<String, FunctionArchive> functions, List<FunctionArchive> functionsToClose) { +} diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/functions/FunctionUtilsReloadTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/functions/FunctionUtilsReloadTest.java new file mode 100644 index 00000000000..0dec5f1c613 --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/functions/FunctionUtilsReloadTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.pulsar.common.functions.FunctionDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.Test; + +@Test +public class FunctionUtilsReloadTest { + + private static void closeEvicted(ReloadFunctionsResult reload) throws Exception { + for (FunctionArchive functionArchive : reload.functionsToClose()) { + functionArchive.close(); + } + } + + private static void writeMinimalNar(Path narPath, FunctionDefinition def) throws IOException { + byte[] yaml = ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def); + try (OutputStream os = Files.newOutputStream(narPath); + ZipOutputStream zos = new ZipOutputStream(os)) { + ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml"); + zos.putNextEntry(entry); + zos.write(yaml); + zos.closeEntry(); + } + } + + private static FunctionDefinition sampleDefinition(String name) { + FunctionDefinition def = new FunctionDefinition(); + def.setName(name); + def.setFunctionClass("org.example.Function"); + return def; + } + + /** + * Historical {@code FunctionsManager} reload replaced the whole map and closed every prior + * {@link FunctionArchive}, even when NAR files were unchanged. A caller keeping a reference to the + * pre-reload archive would then hit {@link IllegalStateException} on lazy use. + * <p> + * Incremental reload must evict nothing, reuse the same instance, and leave that instance usable + * after the caller closes only {@link ReloadFunctionsResult#functionsToClose()}. + */ + @Test + public void reloadUnchangedNarEvictsNothingAndKeepsSameFunctionArchiveUsable() throws Exception { + Path dir = Files.createTempDirectory("fn-reload-"); + Path nar = dir.resolve("f1.nar"); + writeMinimalNar(nar, sampleDefinition("f-one")); + + Map<String, FunctionArchive> first = + FunctionUtils.searchForFunctions(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + FunctionArchive functionArchive = first.get("f-one"); + functionArchive.getFunctionPackage(); + + ReloadFunctionsResult reload = FunctionUtils.reloadFunctions( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + assertTrue(reload.functionsToClose().isEmpty()); + closeEvicted(reload); + Map<String, FunctionArchive> second = reload.functions(); + + assertSame(second.get("f-one"), functionArchive); + functionArchive.getFunctionPackage(); + } + + @Test + public void reloadReopensFunctionArchiveWhenNarContentChanges() throws Exception { + Path dir = Files.createTempDirectory("fn-reload-"); + Path nar = dir.resolve("f1.nar"); + writeMinimalNar(nar, sampleDefinition("f-one")); + + Map<String, FunctionArchive> first = + FunctionUtils.searchForFunctions(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + FunctionArchive before = first.get("f-one"); + + FunctionDefinition updated = sampleDefinition("f-one"); + updated.setDescription("changed"); + writeMinimalNar(nar, updated); + + ReloadFunctionsResult reload = FunctionUtils.reloadFunctions( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + closeEvicted(reload); + Map<String, FunctionArchive> second = reload.functions(); + + assertNotSame(second.get("f-one"), before); + assertThrows(IllegalStateException.class, before::getFunctionPackage); + } + + @Test + public void reloadClosesFunctionArchivesRemovedFromDirectory() throws Exception { + Path dir = Files.createTempDirectory("fn-reload-"); + Path nar1 = dir.resolve("a.nar"); + Path nar2 = dir.resolve("b.nar"); + writeMinimalNar(nar1, sampleDefinition("fn-a")); + writeMinimalNar(nar2, sampleDefinition("fn-b")); + + Map<String, FunctionArchive> first = + FunctionUtils.searchForFunctions(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + FunctionArchive removed = first.get("fn-b"); + Files.delete(nar2); + + ReloadFunctionsResult reload = FunctionUtils.reloadFunctions( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + closeEvicted(reload); + Map<String, FunctionArchive> second = reload.functions(); + + assertEquals(second.size(), 1); + assertSame(second.get("fn-a"), first.get("fn-a")); + assertThrows(IllegalStateException.class, removed::getFunctionPackage); + } + + @Test + public void reloadClosesAllFunctionArchivesWhenDirectoryIsMissing() throws Exception { + Path dir = Files.createTempDirectory("fn-reload-"); + Path nar = dir.resolve("f1.nar"); + writeMinimalNar(nar, sampleDefinition("f-one")); + + Map<String, FunctionArchive> first = + FunctionUtils.searchForFunctions(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + FunctionArchive removed = first.get("f-one"); + Files.delete(nar); + Files.delete(dir); + + ReloadFunctionsResult reload = FunctionUtils.reloadFunctions( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + closeEvicted(reload); + + assertTrue(reload.functions().isEmpty()); + assertThrows(IllegalStateException.class, removed::getFunctionPackage); + } +}
