This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new fdac94fc18 TIKA-4682 4x tweaks (#2674)
fdac94fc18 is described below
commit fdac94fc1824c7c6710dc41dbe9f7811b2ecfe78
Author: Tim Allison <[email protected]>
AuthorDate: Thu Mar 5 17:27:01 2026 -0500
TIKA-4682 4x tweaks (#2674)
* cli tweaks, tweak commandline and add assembly for tika-eval-app
---
.../src/main/java/org/apache/tika/cli/TikaCLI.java | 6 +-
tika-eval/tika-eval-app/pom.xml | 25 +++--
.../tika-eval-app/src/main/assembly/assembly.xml | 47 ++++++++
.../tika/async/cli/FileListPipesIterator.java | 122 +++++++++++++++++++++
.../org/apache/tika/async/cli/PluginsWriter.java | 118 +++++++++++++++-----
.../org/apache/tika/async/cli/TikaAsyncCLI.java | 102 +++++++++++------
.../apache/tika/async/cli/AsyncCliParserTest.java | 44 +++++++-
.../tika/async/cli/FileListPipesIteratorTest.java | 103 +++++++++++++++++
.../tika/pipes/core/async/AsyncProcessor.java | 33 ++++--
.../apache/tika/server/core/TikaServerProcess.java | 29 ++++-
10 files changed, 540 insertions(+), 89 deletions(-)
diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index dd677c6ac2..82be748314 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -405,7 +405,9 @@ public class TikaCLI {
if (arg.equals("-Z") || arg.equals("-z") ||
arg.equals("--extract") || arg.startsWith("--extract-dir")) {
return true;
}
-
+ if (arg.equals("--fileList")) {
+ return true;
+ }
}
return false;
}
@@ -834,6 +836,8 @@ public class TikaCLI {
out.println(" -n Number of forked
processes");
out.println(" -X -Xmx in the forked
processes");
out.println(" -T Timeout in milliseconds");
+ out.println(" --fileList File list (one path per
line, relative to -i or absolute)");
+ out.println(" --handler Handler type: t=text,
h=html, x=xml, m=markdown, b=body, i=ignore");
out.println(" -Z Recursively unpack all the
attachments, too");
out.println(" --unpack-format=<format> Output format: REGULAR
(default) or FRICTIONLESS");
out.println(" --unpack-mode=<mode> Output mode: ZIPPED
(default) or DIRECTORY");
diff --git a/tika-eval/tika-eval-app/pom.xml b/tika-eval/tika-eval-app/pom.xml
index 772b438103..b145c273dd 100644
--- a/tika-eval/tika-eval-app/pom.xml
+++ b/tika-eval/tika-eval-app/pom.xml
@@ -76,22 +76,20 @@
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
<executions>
<execution>
- <id>copy-dependencies</id>
+ <id>make-assembly</id>
<phase>package</phase>
<goals>
- <goal>copy-dependencies</goal>
+ <goal>single</goal>
</goals>
- <configuration>
- <outputDirectory>${project.build.directory}/lib</outputDirectory>
- <includeScope>runtime</includeScope>
- <stripVersion>false</stripVersion>
- <overWriteReleases>false</overWriteReleases>
- <overWriteSnapshots>false</overWriteSnapshots>
- </configuration>
</execution>
</executions>
</plugin>
@@ -128,6 +126,11 @@
<version>${maven.jar.version}</version>
<configuration>
<archive>
+ <manifest>
+ <mainClass>org.apache.tika.eval.app.TikaEvalCLI</mainClass>
+ <addClasspath>true</addClasspath>
+ <classpathPrefix>lib/</classpathPrefix>
+ </manifest>
<manifestEntries>
<Automatic-Module-Name>org.apache.tika.eval.app</Automatic-Module-Name>
</manifestEntries>
diff --git a/tika-eval/tika-eval-app/src/main/assembly/assembly.xml
b/tika-eval/tika-eval-app/src/main/assembly/assembly.xml
new file mode 100644
index 0000000000..439511f5fb
--- /dev/null
+++ b/tika-eval/tika-eval-app/src/main/assembly/assembly.xml
@@ -0,0 +1,47 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.1"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.1
http://maven.apache.org/xsd/assembly-2.1.1.xsd">
+ <id>bin</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>*-sources.jar</exclude>
+ <exclude>*-javadoc.jar</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/FileListPipesIterator.java
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/FileListPipesIterator.java
new file mode 100644
index 0000000000..23237bfcaa
--- /dev/null
+++
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/FileListPipesIterator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.async.cli;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.emitter.EmitKey;
+import org.apache.tika.pipes.api.fetcher.FetchKey;
+import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * PipesIterator that reads file paths from a text file (one path per line).
+ * <p>
+ * If a {@code basePath} is provided, lines are treated as relative paths
+ * under that directory. The fetch key uses the relative path so that the
+ * file-system fetcher (whose basePath is the input directory) can resolve it.
+ * <p>
+ * Blank lines and lines starting with {@code #} are skipped.
+ */
+class FileListPipesIterator implements PipesIterator {
+
+ private final Path fileListPath;
+ private final Path basePath;
+
+ FileListPipesIterator(Path fileListPath, Path basePath) {
+ this.fileListPath = fileListPath;
+ this.basePath = basePath;
+ }
+
+ @Override
+ public Iterator<FetchEmitTuple> iterator() {
+ BufferedReader reader;
+ try {
+ reader = Files.newBufferedReader(fileListPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to open file list: " +
fileListPath, e);
+ }
+
+ AtomicInteger id = new AtomicInteger();
+ return new Iterator<>() {
+ private FetchEmitTuple next;
+ private boolean done;
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+ if (done) {
+ return false;
+ }
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (!line.isEmpty() && !line.startsWith("#")) {
+ next = new FetchEmitTuple(
+ String.valueOf(id.getAndIncrement()),
+ new
FetchKey(TikaConfigAsyncWriter.FETCHER_NAME, line),
+ new
EmitKey(TikaConfigAsyncWriter.EMITTER_NAME, line));
+ return true;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed reading file list", e);
+ }
+ done = true;
+ try {
+ reader.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ return false;
+ }
+
+ @Override
+ public FetchEmitTuple next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ FetchEmitTuple t = next;
+ next = null;
+ return t;
+ }
+ };
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ return (int) Files.lines(fileListPath)
+ .map(String::trim)
+ .filter(line -> !line.isEmpty() && !line.startsWith("#"))
+ .count();
+ }
+
+ @Override
+ public ExtensionConfig getExtensionConfig() {
+ return null;
+ }
+}
diff --git
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/PluginsWriter.java
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/PluginsWriter.java
index 7871bfb9a5..ef04527d95 100644
---
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/PluginsWriter.java
+++
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/PluginsWriter.java
@@ -20,15 +20,14 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.tika.config.loader.TikaObjectMapperFactory;
-import org.apache.tika.pipes.api.ParseMode;
-import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.utils.StringUtils;
@@ -44,8 +43,12 @@ public class PluginsWriter {
}
void write(Path output) throws IOException {
- Path baseInput = Paths.get(simpleAsyncConfig.getInputDir());
- Path baseOutput = Paths.get(simpleAsyncConfig.getOutputDir());
+ Path baseInput = StringUtils.isBlank(simpleAsyncConfig.getInputDir())
+ ? Paths.get(".").toAbsolutePath()
+ : Paths.get(simpleAsyncConfig.getInputDir());
+ Path baseOutput = StringUtils.isBlank(simpleAsyncConfig.getOutputDir())
+ ? null
+ : Paths.get(simpleAsyncConfig.getOutputDir());
if (Files.isRegularFile(baseInput)) {
baseInput = baseInput.toAbsolutePath().getParent();
if (baseInput == null) {
@@ -69,7 +72,7 @@ public class PluginsWriter {
// Set emitter basePath
ObjectNode emitters = (ObjectNode) root.get("emitters");
- if (emitters != null && emitters.has("fse")) {
+ if (baseOutput != null && emitters != null && emitters.has("fse"))
{
ObjectNode fse = (ObjectNode) emitters.get("fse");
if (fse != null && fse.has("file-system-emitter")) {
ObjectNode fsEmitter = (ObjectNode)
fse.get("file-system-emitter");
@@ -85,27 +88,57 @@ public class PluginsWriter {
}
// Set plugin-roots
- String pluginString =
StringUtils.isBlank(simpleAsyncConfig.getPluginsDir()) ?
- "plugins" : simpleAsyncConfig.getPluginsDir();
- Path plugins = Paths.get(pluginString);
- if (Files.isDirectory(plugins)) {
- pluginString = plugins.toAbsolutePath().toString();
+ String pluginString;
+ if (!StringUtils.isBlank(simpleAsyncConfig.getPluginsDir())) {
+ pluginString = simpleAsyncConfig.getPluginsDir();
+ Path plugins = Paths.get(pluginString);
+ if (Files.isDirectory(plugins)) {
+ pluginString = plugins.toAbsolutePath().toString();
+ }
+ } else {
+ pluginString = TikaAsyncCLI.resolveDefaultPluginsDir();
}
root.put("plugin-roots", pluginString);
- // Set pipes config
- PipesConfig pipesConfig = new PipesConfig();
- pipesConfig.setNumClients(simpleAsyncConfig.getNumClients() ==
null ?
- 2 : simpleAsyncConfig.getNumClients());
+ // If the user provided a -c config, merge their settings first.
+ // This brings in parsers, parse-context, metadata-filters, and
+ // optionally pipes config (e.g. forkedJvmArgs with log4j
settings).
+ if (!StringUtils.isBlank(simpleAsyncConfig.getTikaConfig())) {
+ Path userConfigPath =
Paths.get(simpleAsyncConfig.getTikaConfig());
+ JsonNode userRoot =
objectMapper.readTree(userConfigPath.toFile());
+ mergeUserConfig(root, (ObjectNode) userRoot);
+ }
+
+ // Now apply CLI overrides on top of whatever pipes config exists.
+ // This lets the user have forkedJvmArgs in their config (e.g.
log4j)
+ // while still controlling numClients and Xmx from the command
line.
+ ObjectNode pipesNode = root.has("pipes")
+ ? (ObjectNode) root.get("pipes")
+ : objectMapper.createObjectNode();
+
+ if (simpleAsyncConfig.getNumClients() != null) {
+ pipesNode.put("numClients", simpleAsyncConfig.getNumClients());
+ } else if (!pipesNode.has("numClients")) {
+ pipesNode.put("numClients", 2);
+ }
+
if (simpleAsyncConfig.getXmx() != null) {
- pipesConfig.setForkedJvmArgs(new
ArrayList<>(List.of(simpleAsyncConfig.getXmx())));
+ String xmx = simpleAsyncConfig.getXmx();
+ if (!xmx.startsWith("-")) {
+ xmx = "-Xmx" + xmx;
+ }
+ // Replace or add -Xmx in forkedJvmArgs, preserving other args
+ mergeXmxIntoJvmArgs(pipesNode, xmx, objectMapper);
}
+
if (simpleAsyncConfig.isContentOnly()) {
- pipesConfig.setParseMode(ParseMode.CONTENT_ONLY);
+ pipesNode.put("parseMode", "CONTENT_ONLY");
} else if (simpleAsyncConfig.isConcatenate()) {
- pipesConfig.setParseMode(ParseMode.CONCATENATE);
+ pipesNode.put("parseMode", "CONCATENATE");
}
+ root.set("pipes", pipesNode);
+
// For content-only mode, change the emitter file extension based
on handler type
if (simpleAsyncConfig.isContentOnly()) {
String ext =
getFileExtensionForHandlerType(simpleAsyncConfig.getHandlerType());
@@ -118,18 +151,15 @@ public class PluginsWriter {
}
}
- root.set("pipes", objectMapper.valueToTree(pipesConfig));
-
- // Write timeout limits to parse-context if configured
+ // Write timeout limits to parse-context if configured on CLI
if (simpleAsyncConfig.getTimeoutMs() != null) {
- ObjectNode parseContext = (ObjectNode)
root.get("parse-context");
- if (parseContext == null) {
- parseContext = objectMapper.createObjectNode();
- root.set("parse-context", parseContext);
- }
+ ObjectNode parseContext = root.has("parse-context")
+ ? (ObjectNode) root.get("parse-context")
+ : objectMapper.createObjectNode();
ObjectNode timeoutNode = objectMapper.createObjectNode();
timeoutNode.put("progressTimeoutMillis",
simpleAsyncConfig.getTimeoutMs());
parseContext.set("timeout-limits", timeoutNode);
+ root.set("parse-context", parseContext);
}
objectMapper.writerWithDefaultPrettyPrinter().writeValue(output.toFile(), root);
@@ -138,6 +168,40 @@ public class PluginsWriter {
}
}
+ /**
+ * Merges user config fields into the auto-generated root.
+ * All user fields override the auto-generated template values.
+ */
+ private static void mergeUserConfig(ObjectNode root, ObjectNode
userConfig) {
+ Iterator<Map.Entry<String, JsonNode>> fields = userConfig.fields();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = fields.next();
+ root.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Merges an -Xmx arg into the forkedJvmArgs array, replacing any existing
-Xmx
+ * and preserving all other args (e.g. -Dlog4j2.configurationFile=...).
+ */
+ private static void mergeXmxIntoJvmArgs(ObjectNode pipesNode, String xmx,
+ ObjectMapper objectMapper) {
+ com.fasterxml.jackson.databind.node.ArrayNode argsArray =
+ objectMapper.createArrayNode();
+
+ // Preserve existing args, skipping any old -Xmx
+ if (pipesNode.has("forkedJvmArgs") &&
pipesNode.get("forkedJvmArgs").isArray()) {
+ for (JsonNode arg : pipesNode.get("forkedJvmArgs")) {
+ String val = arg.asText();
+ if (!val.startsWith("-Xmx")) {
+ argsArray.add(val);
+ }
+ }
+ }
+ argsArray.add(xmx);
+ pipesNode.set("forkedJvmArgs", argsArray);
+ }
+
private static String getFileExtensionForHandlerType(
BasicContentHandlerFactory.HANDLER_TYPE handlerType) {
return switch (handlerType) {
diff --git
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
index a28e0b26f9..ddfd583ef9 100644
---
a/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
+++
b/tika-pipes/tika-async-cli/src/main/java/org/apache/tika/async/cli/TikaAsyncCLI.java
@@ -64,12 +64,13 @@ public class TikaAsyncCLI {
options.addOption("i", "inputDir", true, "input directory");
options.addOption("o", "outputDir", true, "output directory");
options.addOption("n", "numClients", true, "number of forked clients");
- options.addOption("X", "Xmx", true, "heap for the forked clients in
usual jvm heap amount, e.g. -X 1g");
- options.addOption("?", "help", false, "this help message");
+ options.addOption(null, "Xmx", true, "heap for the forked clients,
e.g. --Xmx 1g");
+ options.addOption("h", "help", false, "this help message");
options.addOption("T", "timeoutMs", true, "timeout for each parse in
milliseconds");
- options.addOption("h", "handlerType", true, "handler type: t=text,
h=html, x=xml, m=markdown, b=body, i=ignore");
+ options.addOption(null, "handler", true, "handler type: t=text,
h=html, x=xml, m=markdown, b=body, i=ignore");
options.addOption("p", "pluginsDir", true, "plugins directory");
- //options.addOption("l", "fileList", true, "file list");
+ options.addOption("l", "fileList", true,
+ "file containing one path per line (relative to inputDir or
absolute)");
options.addOption("c", "config", true, "tikaConfig.json");
options.addOption("z", "unzipShallow", false, "extract raw bytes from
direct attachments only (depth=1)");
options.addOption("Z", "unzipRecursive", false, "extract raw bytes
from all attachments recursively");
@@ -110,33 +111,15 @@ public class TikaAsyncCLI {
SimpleAsyncConfig simpleAsyncConfig = parseCommandLine(args);
- Path tikaConfig =
StringUtils.isBlank(simpleAsyncConfig.getTikaConfig()) ? null :
Paths.get(simpleAsyncConfig.getTikaConfig());
- Path tmpTikaConfig = null;
- PipesIterator pipesIterator = null;
-
+ Path tmpTikaConfig = Files.createTempFile("tika-async-tmp-", ".json");
try {
- if (tikaConfig == null) {
- tmpTikaConfig = Files.createTempFile("tika-async-tmp-",
".json");
- tikaConfig = tmpTikaConfig;
- PluginsWriter pluginsWriter = new
PluginsWriter(simpleAsyncConfig, tikaConfig);
- pluginsWriter.write(tikaConfig);
- } else {
- // User provided a config - ensure plugin-roots is set
- tikaConfig = ensurePluginRoots(tikaConfig,
simpleAsyncConfig.getPluginsDir());
- if
(!tikaConfig.equals(Paths.get(simpleAsyncConfig.getTikaConfig()))) {
- // A new merged config was created, mark for cleanup
- tmpTikaConfig = tikaConfig;
- }
- }
+ PluginsWriter pluginsWriter = new PluginsWriter(simpleAsyncConfig,
tmpTikaConfig);
+ pluginsWriter.write(tmpTikaConfig);
- pipesIterator = buildPipesIterator(tikaConfig, simpleAsyncConfig);
-
-
- processWithTikaConfig(pipesIterator, tikaConfig,
simpleAsyncConfig);
+ PipesIterator pipesIterator = buildPipesIterator(tmpTikaConfig,
simpleAsyncConfig);
+ processWithTikaConfig(pipesIterator, tmpTikaConfig,
simpleAsyncConfig);
} finally {
- if (tmpTikaConfig != null) {
- Files.delete(tmpTikaConfig);
- }
+ Files.deleteIfExists(tmpTikaConfig);
}
}
@@ -144,6 +127,14 @@ public class TikaAsyncCLI {
private static PipesIterator buildPipesIterator(Path pluginsConfig,
SimpleAsyncConfig simpleAsyncConfig) throws TikaConfigException, IOException {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pluginsConfig);
String inputDirString = simpleAsyncConfig.getInputDir();
+
+ // If a file list is provided, use it
+ if (!StringUtils.isBlank(simpleAsyncConfig.getFileList())) {
+ Path fileListPath = Paths.get(simpleAsyncConfig.getFileList());
+ Path basePath = StringUtils.isBlank(inputDirString) ? null :
Paths.get(inputDirString);
+ return new FileListPipesIterator(fileListPath, basePath);
+ }
+
if (StringUtils.isBlank(inputDirString)) {
Optional<PipesIterator> pipesIteratorOpt =
PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig),
tikaJsonConfig);
if (pipesIteratorOpt.isEmpty()) {
@@ -196,8 +187,8 @@ public class TikaAsyncCLI {
if (line.hasOption("o")) {
outputDir = line.getOptionValue("o");
}
- if (line.hasOption("X")) {
- xmx = line.getOptionValue("X");
+ if (line.hasOption("Xmx")) {
+ xmx = line.getOptionValue("Xmx");
}
if (line.hasOption("T")) {
timeoutMs = Long.parseLong(line.getOptionValue("T"));
@@ -216,8 +207,8 @@ public class TikaAsyncCLI {
} else if (line.hasOption("z")) {
extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.SHALLOW;
}
- if (line.hasOption('h')) {
- handlerType = getHandlerType(line.getOptionValue('h'));
+ if (line.hasOption("handler")) {
+ handlerType = getHandlerType(line.getOptionValue("handler"));
}
if (line.hasOption('a')) {
asyncConfig = line.getOptionValue('a');
@@ -286,6 +277,11 @@ public class TikaAsyncCLI {
}
}
+ // If fileList is provided without an outputDir, default to "output"
+ if (fileList != null && outputDir == null) {
+ outputDir = Paths.get("output").toAbsolutePath().toString();
+ }
+
return new SimpleAsyncConfig(inputDir, outputDir,
numClients, timeoutMs, xmx, fileList, tikaConfig, handlerType,
extractBytesMode, pluginsDir, concatenate, contentOnly,
@@ -391,6 +387,37 @@ public class TikaAsyncCLI {
private static final String DEFAULT_PLUGINS_DIR = "plugins";
+ /**
+ * Resolves the default plugins directory. Looks for a "plugins" directory
+ * next to the running jar first, then falls back to the current working
directory.
+ *
+ * @return the resolved plugins directory path, or "plugins" if neither
location exists
+ */
+ static String resolveDefaultPluginsDir() {
+ try {
+ Path jarPath = Paths.get(
+
TikaAsyncCLI.class.getProtectionDomain().getCodeSource().getLocation().toURI());
+ Path jarDir = jarPath.getParent();
+ if (jarDir != null) {
+ // The jar is typically in lib/, so look for plugins/ as a
sibling of lib/
+ Path parent = jarDir.getParent();
+ if (parent != null) {
+ Path pluginsDir = parent.resolve(DEFAULT_PLUGINS_DIR);
+ if (Files.isDirectory(pluginsDir)) {
+ return pluginsDir.toAbsolutePath().toString();
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Fall through to cwd-relative
+ }
+ Path cwdPlugins = Paths.get(DEFAULT_PLUGINS_DIR);
+ if (Files.isDirectory(cwdPlugins)) {
+ return cwdPlugins.toAbsolutePath().toString();
+ }
+ return DEFAULT_PLUGINS_DIR;
+ }
+
/**
* Ensures plugin-roots is set in the config. If missing, creates a merged
config
* with a default plugin-roots value.
@@ -410,10 +437,13 @@ public class TikaAsyncCLI {
// Need to add plugin-roots
ObjectNode mutableRoot = (ObjectNode) rootNode;
- String pluginString = StringUtils.isBlank(pluginsDir) ?
DEFAULT_PLUGINS_DIR : pluginsDir;
- Path plugins = Paths.get(pluginString);
- if (Files.isDirectory(plugins)) {
- pluginString = plugins.toAbsolutePath().toString();
+ String pluginString;
+ if (!StringUtils.isBlank(pluginsDir)) {
+ Path plugins = Paths.get(pluginsDir);
+ pluginString = Files.isDirectory(plugins) ?
+ plugins.toAbsolutePath().toString() : pluginsDir;
+ } else {
+ pluginString = resolveDefaultPluginsDir();
}
mutableRoot.put("plugin-roots", pluginString);
diff --git
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncCliParserTest.java
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncCliParserTest.java
index ef446d2fd7..8795549aab 100644
---
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncCliParserTest.java
+++
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncCliParserTest.java
@@ -18,6 +18,7 @@ package org.apache.tika.async.cli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -80,7 +81,7 @@ public class AsyncCliParserTest {
@Test
public void testAll() throws Exception {
SimpleAsyncConfig simpleAsyncConfig = TikaAsyncCLI.parseCommandLine(
- new String[]{"-i", "input", "-o", "output", "-n", "5", "-T",
"30000", "-X", "1g", "-h", "x"});
+ new String[]{"-i", "input", "-o", "output", "-n", "5", "-T",
"30000", "--Xmx", "1g", "--handler", "x"});
assertEquals("input", simpleAsyncConfig.getInputDir());
assertEquals("output", simpleAsyncConfig.getOutputDir());
assertNull(simpleAsyncConfig.getFileList());
@@ -90,7 +91,42 @@ public class AsyncCliParserTest {
assertEquals(BasicContentHandlerFactory.HANDLER_TYPE.XML,
simpleAsyncConfig.getHandlerType());
}
- //TODO -- test for file list with and without inputDir
+ @Test
+ public void testFileListWithInputDir(@TempDir Path tmp) throws Exception {
+ Path fileList = tmp.resolve("files.txt");
+ Path inputDir = tmp.resolve("input");
+ Files.createDirectories(inputDir);
+ Files.writeString(fileList, "doc1.pdf\ndoc2.pdf\n");
+
+ SimpleAsyncConfig config = TikaAsyncCLI.parseCommandLine(
+ new String[]{"-l", fileList.toString(), "-i",
inputDir.toString(), "-o", "out"});
+ assertEquals(fileList.toString(), config.getFileList());
+ assertEquals(inputDir.toString(), config.getInputDir());
+ assertEquals("out", config.getOutputDir());
+ }
+
+ @Test
+ public void testFileListWithoutInputDir(@TempDir Path tmp) throws
Exception {
+ Path fileList = tmp.resolve("files.txt");
+ Files.writeString(fileList, "/absolute/path/doc1.pdf\n");
+
+ SimpleAsyncConfig config = TikaAsyncCLI.parseCommandLine(
+ new String[]{"-l", fileList.toString(), "-o", "out"});
+ assertEquals(fileList.toString(), config.getFileList());
+ assertNull(config.getInputDir());
+ assertEquals("out", config.getOutputDir());
+ }
+
+ @Test
+ public void testFileListDefaultsOutputDir(@TempDir Path tmp) throws
Exception {
+ Path fileList = tmp.resolve("files.txt");
+ Files.writeString(fileList, "doc1.pdf\n");
+
+ SimpleAsyncConfig config = TikaAsyncCLI.parseCommandLine(
+ new String[]{"-l", fileList.toString()});
+ assertEquals(fileList.toString(), config.getFileList());
+ assertNotNull(config.getOutputDir(), "outputDir should default when
fileList is used");
+ }
@TempDir
Path tempDir;
@@ -117,7 +153,9 @@ public class AsyncCliParserTest {
ObjectMapper mapper = new ObjectMapper();
JsonNode root = mapper.readTree(result.toFile());
assertTrue(root.has("plugin-roots"), "Merged config should have
plugin-roots");
- assertEquals("plugins", root.get("plugin-roots").asText());
+ String pluginRoots = root.get("plugin-roots").asText();
+ assertTrue(pluginRoots.equals("plugins") ||
pluginRoots.endsWith("/plugins"),
+ "plugin-roots should be 'plugins' or end with '/plugins', got:
" + pluginRoots);
// Original config values should be preserved
assertTrue(root.has("pipes"));
diff --git
a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/FileListPipesIteratorTest.java
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/FileListPipesIteratorTest.java
new file mode 100644
index 0000000000..5a56cabb39
--- /dev/null
+++
b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/FileListPipesIteratorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.async.cli;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.pipes.api.FetchEmitTuple;
+
+public class FileListPipesIteratorTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testBasicFileList() throws Exception {
+ Path fileList = tempDir.resolve("files.txt");
+ Files.writeString(fileList, "doc1.pdf\nsubdir/doc2.txt\ndoc3.html\n");
+
+ FileListPipesIterator iter = new FileListPipesIterator(fileList,
tempDir);
+ List<FetchEmitTuple> tuples = new ArrayList<>();
+ iter.iterator().forEachRemaining(tuples::add);
+
+ assertEquals(3, tuples.size());
+ assertEquals("doc1.pdf", tuples.get(0).getFetchKey().getFetchKey());
+ assertEquals("subdir/doc2.txt",
tuples.get(1).getFetchKey().getFetchKey());
+ assertEquals("doc3.html", tuples.get(2).getFetchKey().getFetchKey());
+
+ assertEquals("fsf", tuples.get(0).getFetchKey().getFetcherId());
+ assertEquals("fse", tuples.get(0).getEmitKey().getEmitterId());
+ }
+
+ @Test
+ public void testSkipsBlankLinesAndComments() throws Exception {
+ Path fileList = tempDir.resolve("files.txt");
+ Files.writeString(fileList, "doc1.pdf\n\n# this is a comment\n
\ndoc2.pdf\n");
+
+ FileListPipesIterator iter = new FileListPipesIterator(fileList, null);
+ List<FetchEmitTuple> tuples = new ArrayList<>();
+ iter.iterator().forEachRemaining(tuples::add);
+
+ assertEquals(2, tuples.size());
+ assertEquals("doc1.pdf", tuples.get(0).getFetchKey().getFetchKey());
+ assertEquals("doc2.pdf", tuples.get(1).getFetchKey().getFetchKey());
+ }
+
+ @Test
+ public void testCallReturnsCount() throws Exception {
+ Path fileList = tempDir.resolve("files.txt");
+ Files.writeString(fileList, "a.pdf\nb.pdf\n# comment\n\nc.pdf\n");
+
+ FileListPipesIterator iter = new FileListPipesIterator(fileList, null);
+ assertEquals(3, iter.call());
+ }
+
+ @Test
+ public void testEmptyFile() throws Exception {
+ Path fileList = tempDir.resolve("empty.txt");
+ Files.writeString(fileList, "");
+
+ FileListPipesIterator iter = new FileListPipesIterator(fileList, null);
+ List<FetchEmitTuple> tuples = new ArrayList<>();
+ iter.iterator().forEachRemaining(tuples::add);
+
+ assertEquals(0, tuples.size());
+ assertEquals(0, iter.call());
+ }
+
+ @Test
+ public void testTrimsWhitespace() throws Exception {
+ Path fileList = tempDir.resolve("files.txt");
+ Files.writeString(fileList, " doc1.pdf \n doc2.pdf\n");
+
+ FileListPipesIterator iter = new FileListPipesIterator(fileList, null);
+ List<FetchEmitTuple> tuples = new ArrayList<>();
+ iter.iterator().forEachRemaining(tuples::add);
+
+ assertEquals(2, tuples.size());
+ assertEquals("doc1.pdf", tuples.get(0).getFetchKey().getFetchKey());
+ assertEquals("doc2.pdf", tuples.get(1).getFetchKey().getFetchKey());
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index cb92aa76ba..cc5f424af5 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -247,19 +247,36 @@ public class AsyncProcessor implements Closeable {
return fetchEmitTuples.remainingCapacity();
}
- public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+ public boolean offer(FetchEmitTuple t, long offerMs)
throws PipesException, InterruptedException {
if (fetchEmitTuples == null) {
throw new IllegalStateException("queue hasn't been initialized
yet.");
- } else if (isShuttingDown) {
- throw new IllegalStateException(
- "Can't call offer after calling close() or " +
"shutdownNow()");
}
- if (applicationErrorOccurred.get()) {
- throw new PipesException("Can't call offer after an application
error occurred");
+ long deadline = System.currentTimeMillis() + offerMs;
+ while (System.currentTimeMillis() < deadline) {
+ synchronized (this) {
+ if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or
shutdownNow()");
+ }
+ if (applicationErrorOccurred.get()) {
+ throw new PipesException(
+ "Can't call offer after an application error
occurred");
+ }
+ checkActive();
+ }
+ // Try a short offer outside the synchronized block so
checkActive()
+ // can still be called by other threads (e.g. the watcher).
+ long remaining = deadline - System.currentTimeMillis();
+ long pollMs = Math.min(remaining, 1000);
+ if (pollMs <= 0) {
+ return false;
+ }
+ if (fetchEmitTuples.offer(t, pollMs, TimeUnit.MILLISECONDS)) {
+ return true;
+ }
}
- checkActive();
- return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
+ return false;
}
/**
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 5028c09000..3111d2f529 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -549,10 +549,33 @@ public class TikaServerProcess {
}
}
+ private static final String DEFAULT_PLUGINS_DIR = "plugins";
+
/**
- * Default plugins directory name, relative to current working directory.
+ * Resolves the default plugins directory. Looks for a "plugins" directory
+ * next to the running jar first, then falls back to the current working
directory.
*/
- private static final String DEFAULT_PLUGINS_DIR = "plugins";
+ private static String resolveDefaultPluginsDir() {
+ try {
+ Path jarPath = Path.of(
+ TikaServerProcess.class.getProtectionDomain()
+ .getCodeSource().getLocation().toURI());
+ Path jarDir = jarPath.getParent();
+ if (jarDir != null) {
+ Path pluginsNextToJar = jarDir.resolve(DEFAULT_PLUGINS_DIR);
+ if (Files.isDirectory(pluginsNextToJar)) {
+ return pluginsNextToJar.toAbsolutePath().toString();
+ }
+ }
+ } catch (Exception e) {
+ // Fall through to cwd-relative
+ }
+ Path cwdPlugins = Path.of(DEFAULT_PLUGINS_DIR);
+ if (Files.isDirectory(cwdPlugins)) {
+ return cwdPlugins.toAbsolutePath().toString();
+ }
+ return DEFAULT_PLUGINS_DIR;
+ }
/**
* Creates or merges server configuration using ConfigMerger.
@@ -586,7 +609,7 @@ public class TikaServerProcess {
// Use PASSBACK_ALL strategy - results returned through socket
.setEmitStrategy(EmitStrategy.PASSBACK_ALL)
// Set plugin roots
-
.setPluginRoots(Path.of(DEFAULT_PLUGINS_DIR).toAbsolutePath().toString());
+ .setPluginRoots(resolveDefaultPluginsDir());
// Only set default pipes config if there's no existing config
// This allows user-provided config to specify their own numClients,
etc.