This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new c4c3441cd TIKA-4579: Add ability to update fetcher/emitter
configurations (#2465)
c4c3441cd is described below
commit c4c3441cd2b5356f90ffa85e74ecd87655839944
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Wed Dec 17 13:53:58 2025 -0600
TIKA-4579: Add ability to update fetcher/emitter configurations (#2465)
* TIKA-4579: Add ability to update fetcher/emitter configurations
- Modified AbstractComponentManager.saveComponent() to allow updates
instead of throwing exception when component ID already exists
- When updating existing component, cache is cleared to force
re-instantiation
- Removed reflection-based hack in TikaGrpcServerImpl.saveFetcher()
- Updated FetcherManager and EmitterManager javadocs
- Updated test to verify update behavior instead of expecting exception
This enables the gRPC server to properly update fetcher configurations
at runtime without using reflection hacks, as described in TIKA-4579.
* Fix EmitterManagerTest to match new update behavior
- Updated testSaveEmitterDuplicate to verify update behavior instead of
expecting exception
- Now verifies that updating an emitter clears cache and creates new
instance
- Matches the changes made to FetcherManagerTest
- Fixes build failure in CI
---------
Co-authored-by: Nicholas DiPiazza <[email protected]>
---
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 35 +---------------------
.../tika/pipes/core/AbstractComponentManager.java | 21 ++++++-------
.../tika/pipes/core/emitter/EmitterManager.java | 6 ++--
.../tika/pipes/core/fetcher/FetcherManager.java | 6 ++--
.../pipes/core/emitter/EmitterManagerTest.java | 25 +++++++++++-----
.../pipes/core/fetcher/FetcherManagerTest.java | 25 +++++++++++-----
6 files changed, 54 insertions(+), 64 deletions(-)
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index c7e268c1d..29632929f 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -200,40 +200,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
try {
String factoryName =
findFactoryNameForClass(request.getFetcherClass());
ExtensionConfig config = new
ExtensionConfig(request.getFetcherId(), factoryName,
request.getFetcherConfigJson());
-
- // Check if fetcher already exists, if so, we need to update it
- if
(fetcherManager.getSupported().contains(request.getFetcherId())) {
- LOG.info("Updating existing fetcher: {}",
request.getFetcherId());
- // We can't update directly through the API, so we use
reflection to update
- // the configStore's internal map and clear the componentCache
- try {
- // Get the AbstractComponentManager superclass
- Class<?> superClass =
fetcherManager.getClass().getSuperclass();
-
- // Access the configStore field
- java.lang.reflect.Field configStoreField =
superClass.getDeclaredField("configStore");
- configStoreField.setAccessible(true);
- Object configStore = configStoreField.get(fetcherManager);
-
- // For InMemoryConfigStore, access its internal map to
replace the config
- java.lang.reflect.Field storeField =
configStore.getClass().getDeclaredField("store");
- storeField.setAccessible(true);
- @SuppressWarnings("unchecked")
- java.util.Map<String, ExtensionConfig> store =
- (java.util.Map<String, ExtensionConfig>)
storeField.get(configStore);
- store.put(config.id(), config);
-
- // Also clear the cache so it gets re-instantiated
- java.lang.reflect.Field cacheField =
superClass.getDeclaredField("componentCache");
- cacheField.setAccessible(true);
- @SuppressWarnings("unchecked") java.util.Map<String,
Fetcher> cache = (java.util.Map<String, Fetcher>)
cacheField.get(fetcherManager);
- cache.remove(config.id());
- } catch (Exception e) {
- throw new RuntimeException("Failed to update fetcher", e);
- }
- } else {
- fetcherManager.saveFetcher(config);
- }
+ fetcherManager.saveFetcher(config);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
index 059cfc839..4bfe20383 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/AbstractComponentManager.java
@@ -257,16 +257,16 @@ public abstract class AbstractComponentManager<T extends
TikaExtension,
}
/**
- * Dynamically adds a component configuration at runtime.
+ * Dynamically adds or updates a component configuration at runtime.
* The component will not be instantiated until it is first requested via
{@link #getComponent(String)}.
+ * If a component with the same ID already exists, it will be replaced and
the cached instance cleared.
* <p>
* This method is only available if the manager was loaded with
allowRuntimeModifications=true.
* <p>
* Only authorized/authenticated users should be allowed to modify
components. BE CAREFUL.
*
* @param config the extension configuration for the component
- * @throws TikaConfigException if the component type is unknown, if a
component with the same ID already exists,
- * or if runtime modifications are not allowed
+ * @throws TikaConfigException if the component type is unknown or if
runtime modifications are not allowed
* @throws IOException if there is an error accessing the plugin manager
*/
public synchronized void saveComponent(ExtensionConfig config) throws
TikaConfigException, IOException {
@@ -284,12 +284,6 @@ public abstract class AbstractComponentManager<T extends
TikaExtension,
String componentId = config.id();
String typeName = config.name();
- // Check for duplicate ID
- if (configStore.containsKey(componentId)) {
- throw new TikaConfigException(getComponentName().substring(0,
1).toUpperCase(Locale.ROOT) +
- getComponentName().substring(1) + " with id '" +
componentId + "' already exists");
- }
-
// Validate that factory exists for this type
Map<String, F> factories = getFactories(pluginManager);
if (!factories.containsKey(typeName)) {
@@ -298,9 +292,16 @@ public abstract class AbstractComponentManager<T extends
TikaExtension,
". Available: " + factories.keySet());
}
+ // If updating existing component, clear the cache so it gets
re-instantiated
+ if (configStore.containsKey(componentId)) {
+ componentCache.remove(componentId);
+ LOG.debug("Updating existing {} config: id={}, type={}",
getComponentName(), componentId, typeName);
+ } else {
+ LOG.debug("Creating new {} config: id={}, type={}",
getComponentName(), componentId, typeName);
+ }
+
// Store config without instantiating
configStore.put(componentId, config);
- LOG.debug("Saved {} config: id={}, type={}", getComponentName(),
componentId, typeName);
}
/**
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index 05551c112..e424fc35a 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -157,9 +157,10 @@ public class EmitterManager extends
AbstractComponentManager<Emitter, EmitterFac
}
/**
- * Dynamically adds an emitter configuration at runtime.
+ * Dynamically adds or updates an emitter configuration at runtime.
* The emitter will not be instantiated until it is first requested via
{@link #getEmitter(String)}.
* This allows for dynamic configuration without the overhead of immediate
instantiation.
+ * If an emitter with the same ID already exists, it will be replaced and
the cached instance cleared.
* <p>
* This method is only available if the EmitterManager was loaded with
* {@link #load(PluginManager, TikaJsonConfig, boolean)} with
allowRuntimeModifications=true.
@@ -167,8 +168,7 @@ public class EmitterManager extends
AbstractComponentManager<Emitter, EmitterFac
* Only authorized/authenticated users should be allowed to modify
emitters. BE CAREFUL.
*
* @param config the extension configuration for the emitter
- * @throws TikaConfigException if the emitter type is unknown, if an
emitter with the same ID
- * already exists, or if runtime modifications
are not allowed
+ * @throws TikaConfigException if the emitter type is unknown or if
runtime modifications are not allowed
* @throws IOException if there is an error accessing the plugin manager
*/
public void saveEmitter(ExtensionConfig config) throws
TikaConfigException, IOException {
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index c9ed4c3bf..e282b8495 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -156,9 +156,10 @@ public class FetcherManager extends
AbstractComponentManager<Fetcher, FetcherFac
}
/**
- * Dynamically adds a fetcher configuration at runtime.
+ * Dynamically adds or updates a fetcher configuration at runtime.
* The fetcher will not be instantiated until it is first requested via
{@link #getFetcher(String)}.
* This allows for dynamic configuration without the overhead of immediate
instantiation.
+ * If a fetcher with the same ID already exists, it will be replaced and
the cached instance cleared.
* <p>
* This method is only available if the FetcherManager was loaded with
* {@link #load(PluginManager, TikaJsonConfig, boolean)} with
allowRuntimeModifications=true.
@@ -166,8 +167,7 @@ public class FetcherManager extends
AbstractComponentManager<Fetcher, FetcherFac
* Only authorized/authenticated users should be allowed to modify
fetchers. BE CAREFUL.
*
* @param config the extension configuration for the fetcher
- * @throws TikaConfigException if the fetcher type is unknown, if a
fetcher with the same ID
- * already exists, or if runtime modifications
are not allowed
+ * @throws TikaConfigException if the fetcher type is unknown or if
runtime modifications are not allowed
* @throws IOException if there is an error accessing the plugin manager
*/
public void saveFetcher(ExtensionConfig config) throws
TikaConfigException, IOException {
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
index e2990abd9..a50021e25 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/emitter/EmitterManagerTest.java
@@ -385,21 +385,32 @@ public class EmitterManagerTest {
EmitterManager emitterManager = EmitterManager.load(pluginManager,
tikaJsonConfig, true);
- // Try to add an emitter with the same ID as existing one
+ // Update existing emitter with new configuration
String newConfigJson = String.format(Locale.ROOT, """
{
"basePath": "%s",
"onExists": "REPLACE"
}
""", JsonConfigHelper.toJsonPath(tmpDir.resolve("output2")));
- ExtensionConfig duplicateConfig = new ExtensionConfig("fse",
"file-system-emitter", newConfigJson);
+ ExtensionConfig updatedConfig = new ExtensionConfig("fse",
"file-system-emitter", newConfigJson);
- TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
- emitterManager.saveEmitter(duplicateConfig);
- });
+ // Get original emitter instance
+ Emitter originalEmitter = emitterManager.getEmitter("fse");
+ assertNotNull(originalEmitter);
+
+ // Update the emitter config
+ emitterManager.saveEmitter(updatedConfig);
+
+ // Should still only have 1 emitter
+ assertEquals(1, emitterManager.getSupported().size());
+ assertTrue(emitterManager.getSupported().contains("fse"));
- assertTrue(exception.getMessage().contains("already exists"));
- assertTrue(exception.getMessage().contains("fse"));
+ // Getting the emitter again should return a NEW instance (cache
cleared)
+ Emitter updatedEmitter = emitterManager.getEmitter("fse");
+ assertNotNull(updatedEmitter);
+
+ // Should be different instance due to re-instantiation
+ assertTrue(originalEmitter != updatedEmitter, "Updated emitter should
be a new instance");
}
@Test
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
index f41fb4a19..6bbadb2e8 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/fetcher/FetcherManagerTest.java
@@ -376,18 +376,29 @@ public class FetcherManagerTest {
FetcherManager fetcherManager = FetcherManager.load(pluginManager,
tikaJsonConfig, true);
- // Try to add a fetcher with the same ID as existing one
+ // Update existing fetcher with new configuration
String newConfigJson = String.format(Locale.ROOT, """
{"basePath": "%s"}
""", JsonConfigHelper.toJsonPath(tmpDir.resolve("path2")));
- ExtensionConfig duplicateConfig = new ExtensionConfig("fsf",
"file-system-fetcher", newConfigJson);
+ ExtensionConfig updatedConfig = new ExtensionConfig("fsf",
"file-system-fetcher", newConfigJson);
- TikaConfigException exception =
assertThrows(TikaConfigException.class, () -> {
- fetcherManager.saveFetcher(duplicateConfig);
- });
+ // Get original fetcher instance
+ Fetcher originalFetcher = fetcherManager.getFetcher("fsf");
+ assertNotNull(originalFetcher);
+
+ // Update the fetcher config
+ fetcherManager.saveFetcher(updatedConfig);
+
+ // Should still only have 1 fetcher
+ assertEquals(1, fetcherManager.getSupported().size());
+ assertTrue(fetcherManager.getSupported().contains("fsf"));
- assertTrue(exception.getMessage().contains("already exists"));
- assertTrue(exception.getMessage().contains("fsf"));
+ // Getting the fetcher again should return a NEW instance (cache
cleared)
+ Fetcher updatedFetcher = fetcherManager.getFetcher("fsf");
+ assertNotNull(updatedFetcher);
+
+ // Should be different instance due to re-instantiation
+ assertTrue(originalFetcher != updatedFetcher, "Updated fetcher should
be a new instance");
}
@Test