This is an automated email from the ASF dual-hosted git repository. ndipiazza pushed a commit to branch TIKA-4583-ignite-config-store in repository https://gitbox.apache.org/repos/asf/tika.git
commit 83d23eaf55e9743b5f0561f8522468b4bb890980 Author: Nicholas DiPiazza <[email protected]> AuthorDate: Thu Dec 18 13:47:37 2025 -0600 TIKA-4583: Add Apache Ignite ConfigStore implementation - Added init() method to ConfigStore interface for initialization support - Created new Maven sub-module: tika-ignite-config-store - Implemented IgniteConfigStore using Apache Ignite distributed cache - Provides distributed configuration storage for Tika Pipes clustering - Supports REPLICATED and PARTITIONED cache modes - Thread-safe implementation with comprehensive error handling - Added test suite for IgniteConfigStore - Updated parent pom.xml to include new module - Added comprehensive README with usage examples --- tika-pipes/pom.xml | 1 + tika-pipes/tika-ignite-config-store/README.md | 158 ++++++++++++++++ tika-pipes/tika-ignite-config-store/pom.xml | 106 +++++++++++ .../tika/pipes/ignite/IgniteConfigStore.java | 192 +++++++++++++++++++ .../tika/pipes/ignite/IgniteConfigStoreTest.java | 207 +++++++++++++++++++++ .../apache/tika/pipes/core/config/ConfigStore.java | 11 ++ 6 files changed, 675 insertions(+) diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml index e8366313d..639551bde 100644 --- a/tika-pipes/pom.xml +++ b/tika-pipes/pom.xml @@ -38,6 +38,7 @@ <module>tika-pipes-plugins</module> <module>tika-pipes-fork-parser</module> <module>tika-async-cli</module> + <module>tika-ignite-config-store</module> <module>tika-pipes-integration-tests</module> </modules> <dependencies> diff --git a/tika-pipes/tika-ignite-config-store/README.md b/tika-pipes/tika-ignite-config-store/README.md new file mode 100644 index 000000000..d5b1508d2 --- /dev/null +++ b/tika-pipes/tika-ignite-config-store/README.md @@ -0,0 +1,158 @@ +# Apache Tika Ignite Config Store + +This module provides an Apache Ignite-based implementation of the `ConfigStore` interface for distributed configuration storage in Tika Pipes clustering deployments. + +## Overview + +The `IgniteConfigStore` enables multiple Tika Pipes servers to share Fetcher, Emitter, and PipesIterator configurations across a cluster using Apache Ignite's distributed cache. + +## Features + +- **Distributed Configuration Storage**: Share configurations across multiple Tika servers +- **Thread-Safe Operations**: All operations are thread-safe for concurrent access +- **Flexible Cache Modes**: Supports both REPLICATED and PARTITIONED cache modes +- **Simple API**: Implements the standard `ConfigStore` interface +- **Automatic Initialization**: Easy setup with sensible defaults + +## Maven Dependency + +```xml +<dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-ignite-config-store</artifactId> + <version>4.0.0-SNAPSHOT</version> +</dependency> +``` + +## Usage + +### Basic Example + +```java +import org.apache.tika.pipes.ignite.IgniteConfigStore; +import org.apache.tika.plugins.ExtensionConfig; + +// Create and initialize the store +IgniteConfigStore store = new IgniteConfigStore(); +store.init(); + +// Store a configuration +ExtensionConfig config = new ExtensionConfig("my-fetcher", "http-fetcher", "{\"timeout\": 30000}"); +store.put("my-fetcher", config); + +// Retrieve a configuration +ExtensionConfig retrieved = store.get("my-fetcher"); + +// Clean up when done +store.close(); +``` + +### Custom Configuration + +```java +IgniteConfigStore store = new IgniteConfigStore(); + +// Customize cache name +store.setCacheName("my-custom-cache"); + +// Set cache mode (REPLICATED or PARTITIONED) +store.setCacheMode(CacheMode.PARTITIONED); + +// Set Ignite instance name +store.setIgniteInstanceName("MyTikaCluster"); + +// Initialize +store.init(); +``` + +### Integration with Tika Pipes + +The `IgniteConfigStore` can be used as a drop-in replacement for the default `InMemoryConfigStore` in clustered Tika Pipes deployments: + +```java +// In your Tika Pipes server setup +IgniteConfigStore configStore = new IgniteConfigStore(); +configStore.init(); + +// Use with your component managers +FetcherManager fetcherManager = new FetcherManager(configStore); +EmitterManager emitterManager = new EmitterManager(configStore); +``` + +## Configuration Options + +| Property | Description | Default | +|----------|-------------|---------| +| `cacheName` | Name of the Ignite cache | `tika-config-store` | +| `cacheMode` | Cache replication mode (REPLICATED or PARTITIONED) | `REPLICATED` | +| `igniteInstanceName` | Name of the Ignite instance | `TikaIgniteConfigStore` | +| `autoClose` | Whether to automatically close Ignite on close() | `true` | + +## Cache Modes + +### REPLICATED Mode (Default) +- All configurations are replicated to every node +- Faster reads, higher memory usage +- Best for small to medium configuration sets +- Provides highest availability + +### PARTITIONED Mode +- Configurations are distributed across nodes +- More memory efficient for large configuration sets +- Includes 1 backup by default +- Best for very large deployments + +## Requirements + +- Java 17 or higher +- Apache Ignite 2.16.0 +- Apache Tika 4.0.0-SNAPSHOT or higher + +## Thread Safety + +The `IgniteConfigStore` implementation is fully thread-safe and can be safely accessed from multiple threads concurrently. + +## Error Handling + +All operations will throw `IllegalStateException` if called before `init()` is invoked: + +```java +IgniteConfigStore store = new IgniteConfigStore(); +// This will throw IllegalStateException +store.put("id", config); // ERROR: Must call init() first! + +// Correct usage +store.init(); +store.put("id", config); // OK +``` + +## Clustering Considerations + +### Multiple Tika Servers + +When running multiple Tika servers with `IgniteConfigStore`: + +1. Each server should use the same `cacheName` +2. Ensure Ignite discovery is properly configured +3. Configurations created on one server will be immediately available on all other servers + +### Network Configuration + +For production deployments, configure Ignite discovery mechanisms (multicast, TCP/IP discovery, or cloud discovery) in `IgniteConfiguration` before calling `Ignition.start()`. + +## Limitations + +- The `init()` method must be called before any other operations +- Once initialized, configuration options (cacheName, cacheMode, etc.) cannot be changed +- Requires Ignite cluster setup for distributed operation + +## See Also + +- [Apache Ignite Documentation](https://ignite.apache.org/docs/latest/) +- [Tika Pipes Documentation](https://tika.apache.org/) +- `ConfigStore` interface +- `InMemoryConfigStore` - Single-instance alternative + +## License + +Licensed under the Apache License, Version 2.0. See LICENSE file for details. diff --git a/tika-pipes/tika-ignite-config-store/pom.xml b/tika-pipes/tika-ignite-config-store/pom.xml new file mode 100644 index 000000000..507f8d485 --- /dev/null +++ b/tika-pipes/tika-ignite-config-store/pom.xml @@ -0,0 +1,106 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes</artifactId> + <version>4.0.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tika-ignite-config-store</artifactId> + + <name>Apache Tika Ignite Config Store</name> + <url>https://tika.apache.org/</url> + + <properties> + <ignite.version>2.16.0</ignite.version> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-pipes-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-plugins-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${ignite.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${ignite.version}</version> + <exclusions> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Automatic-Module-Name>org.apache.tika.pipes.ignite.config</Automatic-Module-Name> + </manifestEntries> + </archive> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/tika-pipes/tika-ignite-config-store/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java b/tika-pipes/tika-ignite-config-store/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java new file mode 100644 index 000000000..029676b60 --- /dev/null +++ b/tika-pipes/tika-ignite-config-store/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.util.Set; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.pipes.core.config.ConfigStore; +import org.apache.tika.plugins.ExtensionConfig; + +/** + * Apache Ignite-based implementation of {@link ConfigStore}. + * Provides distributed configuration storage for Tika Pipes clustering. + * <p> + * This implementation is thread-safe and suitable for multi-instance deployments + * where configurations need to be shared across multiple servers. + * <p> + * Configuration options: + * <ul> + * <li>cacheName - Name of the Ignite cache (default: "tika-config-store")</li> + * <li>cacheMode - Cache replication mode: PARTITIONED or REPLICATED (default: REPLICATED)</li> + * <li>igniteInstanceName - Name of the Ignite instance (default: "TikaIgniteConfigStore")</li> + * </ul> + */ +public class IgniteConfigStore implements ConfigStore { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteConfigStore.class); + private static final String DEFAULT_CACHE_NAME = "tika-config-store"; + private static final String DEFAULT_INSTANCE_NAME = "TikaIgniteConfigStore"; + + private Ignite ignite; + private IgniteCache<String, ExtensionConfig> cache; + private String cacheName = DEFAULT_CACHE_NAME; + private CacheMode cacheMode = CacheMode.REPLICATED; + private String igniteInstanceName = DEFAULT_INSTANCE_NAME; + private boolean autoClose = true; + + /** + * Default constructor. + * Call {@link #init()} before using the store. + */ + public IgniteConfigStore() { + } + + /** + * Constructor with custom cache name. + * + * @param cacheName the name of the Ignite cache to use + */ + public IgniteConfigStore(String cacheName) { + this.cacheName = cacheName; + } + + @Override + public void init() throws Exception { + if (ignite != null) { + LOG.warn("IgniteConfigStore already initialized"); + return; + } + + LOG.info("Initializing IgniteConfigStore with cache: {}, mode: {}, instance: {}", + cacheName, cacheMode, igniteInstanceName); + + IgniteConfiguration cfg = new IgniteConfiguration(); + cfg.setIgniteInstanceName(igniteInstanceName); + cfg.setClientMode(false); + + ignite = Ignition.start(cfg); + + CacheConfiguration<String, ExtensionConfig> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(cacheMode); + cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0); + + cache = ignite.getOrCreateCache(cacheCfg); + LOG.info("IgniteConfigStore initialized successfully"); + } + + @Override + public void put(String id, ExtensionConfig config) { + if (cache == null) { + throw new IllegalStateException("IgniteConfigStore not initialized. Call init() first."); + } + cache.put(id, config); + } + + @Override + public ExtensionConfig get(String id) { + if (cache == null) { + throw new IllegalStateException("IgniteConfigStore not initialized. Call init() first."); + } + return cache.get(id); + } + + @Override + public boolean containsKey(String id) { + if (cache == null) { + throw new IllegalStateException("IgniteConfigStore not initialized. Call init() first."); + } + return cache.containsKey(id); + } + + @Override + public Set<String> keySet() { + if (cache == null) { + throw new IllegalStateException("IgniteConfigStore not initialized. Call init() first."); + } + return Set.copyOf(cache.query(new org.apache.ignite.cache.query.ScanQuery<String, ExtensionConfig>()) + .getAll() + .stream() + .map(entry -> entry.getKey()) + .toList()); + } + + @Override + public int size() { + if (cache == null) { + throw new IllegalStateException("IgniteConfigStore not initialized. Call init() first."); + } + return cache.size(); + } + + /** + * Closes the Ignite instance and releases resources. + * Only call this when you're completely done with the store. + */ + public void close() { + if (ignite != null && autoClose) { + LOG.info("Closing IgniteConfigStore"); + ignite.close(); + ignite = null; + cache = null; + } + } + + /** + * Sets the cache name. + * + * @param cacheName the cache name + */ + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * Sets the cache mode. + * + * @param cacheMode the cache mode (PARTITIONED or REPLICATED) + */ + public void setCacheMode(CacheMode cacheMode) { + this.cacheMode = cacheMode; + } + + /** + * Sets the Ignite instance name. + * + * @param igniteInstanceName the instance name + */ + public void setIgniteInstanceName(String igniteInstanceName) { + this.igniteInstanceName = igniteInstanceName; + } + + /** + * Sets whether to automatically close Ignite on close(). + * + * @param autoClose true to auto-close, false to leave running + */ + public void setAutoClose(boolean autoClose) { + this.autoClose = autoClose; + } +} diff --git a/tika-pipes/tika-ignite-config-store/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java b/tika-pipes/tika-ignite-config-store/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java new file mode 100644 index 000000000..bc72ad0b5 --- /dev/null +++ b/tika-pipes/tika-ignite-config-store/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.tika.plugins.ExtensionConfig; + +public class IgniteConfigStoreTest { + + private IgniteConfigStore store; + + @BeforeEach + public void setUp() throws Exception { + store = new IgniteConfigStore(); + store.setIgniteInstanceName("TestIgniteInstance-" + System.currentTimeMillis()); + store.init(); + } + + @AfterEach + public void tearDown() { + if (store != null) { + store.close(); + } + } + + @Test + public void testPutAndGet() { + ExtensionConfig config = new ExtensionConfig("id1", "type1", "{\"key\":\"value\"}"); + + store.put("id1", config); + + ExtensionConfig retrieved = store.get("id1"); + assertNotNull(retrieved); + assertEquals("id1", retrieved.id()); + assertEquals("type1", retrieved.name()); + assertEquals("{\"key\":\"value\"}", retrieved.json()); + } + + @Test + public void testContainsKey() { + ExtensionConfig config = new ExtensionConfig("id1", "type1", "{}"); + + assertFalse(store.containsKey("id1")); + + store.put("id1", config); + + assertTrue(store.containsKey("id1")); + assertFalse(store.containsKey("nonexistent")); + } + + @Test + public void testSize() { + assertEquals(0, store.size()); + + store.put("id1", new ExtensionConfig("id1", "type1", "{}")); + assertEquals(1, store.size()); + + store.put("id2", new ExtensionConfig("id2", "type2", "{}")); + assertEquals(2, store.size()); + + store.put("id1", new ExtensionConfig("id1", "type1", "{\"updated\":true}")); + assertEquals(2, store.size()); + } + + @Test + public void testKeySet() { + assertTrue(store.keySet().isEmpty()); + + store.put("id1", new ExtensionConfig("id1", "type1", "{}")); + store.put("id2", new ExtensionConfig("id2", "type2", "{}")); + + assertEquals(2, store.keySet().size()); + assertTrue(store.keySet().contains("id1")); + assertTrue(store.keySet().contains("id2")); + assertFalse(store.keySet().contains("id3")); + } + + @Test + public void testGetNonExistent() { + assertNull(store.get("nonexistent")); + } + + @Test + public void testUpdateExisting() { + ExtensionConfig config1 = new ExtensionConfig("id1", "type1", "{\"version\":1}"); + ExtensionConfig config2 = new ExtensionConfig("id1", "type1", "{\"version\":2}"); + + store.put("id1", config1); + assertEquals("{\"version\":1}", store.get("id1").json()); + + store.put("id1", config2); + assertEquals("{\"version\":2}", store.get("id1").json()); + assertEquals(1, store.size()); + } + + @Test + public void testMultipleConfigs() { + for (int i = 0; i < 10; i++) { + String id = "config" + i; + ExtensionConfig config = new ExtensionConfig(id, "type" + i, "{\"index\":" + i + "}"); + store.put(id, config); + } + + assertEquals(10, store.size()); + + for (int i = 0; i < 10; i++) { + String id = "config" + i; + ExtensionConfig config = store.get(id); + assertNotNull(config); + assertEquals(id, config.id()); + assertEquals("type" + i, config.name()); + } + } + + @Test + public void testUninitializedStore() { + IgniteConfigStore uninitializedStore = new IgniteConfigStore(); + + assertThrows(IllegalStateException.class, () -> { + uninitializedStore.put("id1", new ExtensionConfig("id1", "type1", "{}")); + }); + + assertThrows(IllegalStateException.class, () -> { + uninitializedStore.get("id1"); + }); + + assertThrows(IllegalStateException.class, () -> { + uninitializedStore.containsKey("id1"); + }); + + assertThrows(IllegalStateException.class, () -> { + uninitializedStore.size(); + }); + + assertThrows(IllegalStateException.class, () -> { + uninitializedStore.keySet(); + }); + } + + @Test + public void testThreadSafety() throws InterruptedException { + int numThreads = 10; + int numOperationsPerThread = 100; + + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + threads[i] = new Thread(() -> { + for (int j = 0; j < numOperationsPerThread; j++) { + String id = "thread" + threadId + "_config" + j; + ExtensionConfig config = new ExtensionConfig(id, "type", "{}"); + store.put(id, config); + assertNotNull(store.get(id)); + } + }); + threads[i].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals(numThreads * numOperationsPerThread, store.size()); + } + + @Test + public void testCustomCacheName() throws Exception { + IgniteConfigStore customStore = new IgniteConfigStore("custom-cache"); + customStore.setIgniteInstanceName("CustomInstance-" + System.currentTimeMillis()); + + try { + customStore.init(); + + ExtensionConfig config = new ExtensionConfig("id1", "type1", "{}"); + customStore.put("id1", config); + + assertNotNull(customStore.get("id1")); + assertEquals("id1", customStore.get("id1").id()); + } finally { + customStore.close(); + } + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java index 2f6c4c164..4f4075a48 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/config/ConfigStore.java @@ -34,6 +34,17 @@ import org.apache.tika.plugins.ExtensionConfig; */ public interface ConfigStore { + /** + * Initializes the configuration store. + * This method should be called once before using the store. + * Implementations may use this to establish connections, initialize caches, etc. + * + * @throws Exception if initialization fails + */ + default void init() throws Exception { + // Default implementation does nothing + } + /** * Stores a configuration. *
