This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4628 in repository https://gitbox.apache.org/repos/asf/tika.git
commit a81d64dcded0f8025dc4d37eb955c4084df8472c Author: tallison <[email protected]> AuthorDate: Thu Jan 22 15:31:39 2026 -0500 TIKA-4628 -- improve pipesClient+pipesServer ipc: critical socket.setTcpNoDelay(true) and migrate to pure jackson serialization --- .../tika/pipes/api/emitter/AbstractEmitter.java | 7 +++- .../pipes/api/emitter/AbstractStreamEmitter.java | 7 +++- .../apache/tika/pipes/api/emitter/EmitData.java | 6 ++- .../org/apache/tika/pipes/core/PipesClient.java | 44 +++++++++----------- .../tika/pipes/core/emitter/EmitDataImpl.java | 32 +++++++-------- .../apache/tika/pipes/core/server/PipesServer.java | 48 ++++++++-------------- .../apache/tika/pipes/core/PassbackFilterTest.java | 24 ++--------- .../apache/tika/pipes/core/PipesClientTest.java | 24 +++++++---- .../tika/pipes/emitter/jdbc/JDBCEmitter.java | 6 ++- 9 files changed, 93 insertions(+), 105 deletions(-) diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java index 324d16ace9..250b3df9d9 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java @@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.util.List; +import org.apache.tika.parser.ParseContext; import org.apache.tika.plugins.AbstractTikaExtension; import org.apache.tika.plugins.ExtensionConfig; @@ -31,7 +32,11 @@ public abstract class AbstractEmitter extends AbstractTikaExtension implements E @Override public void emit(List<? extends EmitData> emitData) throws IOException { for (EmitData item : emitData) { - emit(item.getEmitKey(), item.getMetadataList(), item.getParseContext()); + ParseContext parseContext = item.getParseContext(); + if (parseContext == null) { + parseContext = new ParseContext(); + } + emit(item.getEmitKey(), item.getMetadataList(), parseContext); } } } diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java index 892180d8c7..98727205f1 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java @@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter; import java.io.IOException; import java.util.List; +import org.apache.tika.parser.ParseContext; import org.apache.tika.plugins.AbstractTikaExtension; import org.apache.tika.plugins.ExtensionConfig; @@ -31,7 +32,11 @@ public abstract class AbstractStreamEmitter extends AbstractTikaExtension implem @Override public void emit(List<? extends EmitData> emitData) throws IOException { for (EmitData item : emitData) { - emit(item.getEmitKey(), item.getMetadataList(), item.getParseContext()); + ParseContext parseContext = item.getParseContext(); + if (parseContext == null) { + parseContext = new ParseContext(); + } + emit(item.getEmitKey(), item.getMetadataList(), parseContext); } } } diff --git a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java index 6af852a63b..ec6266ed23 100644 --- a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java +++ b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java @@ -30,6 +30,10 @@ public interface EmitData { long getEstimatedSizeBytes(); + /** + * Gets the ParseContext. This is not serialized over IPC - it's restored + * by PipesClient after deserialization from the original FetchEmitTuple. + * May return null if not set. + */ ParseContext getParseContext(); - } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index e79c35ccfe..800930aba9 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -24,13 +24,14 @@ import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK; import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED; import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; @@ -49,7 +50,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +62,7 @@ import org.apache.tika.pipes.api.FetchEmitTuple; import org.apache.tika.pipes.api.PipesResult; import org.apache.tika.pipes.api.emitter.EmitKey; import org.apache.tika.pipes.core.emitter.EmitDataImpl; +import org.apache.tika.pipes.core.serialization.JsonPipesIpc; import org.apache.tika.pipes.core.server.IntermediateResult; import org.apache.tika.pipes.core.server.PipesServer; import org.apache.tika.utils.ExceptionUtils; @@ -254,14 +255,7 @@ public class PipesClient implements Closeable { private void writeTask(FetchEmitTuple t) throws IOException { LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", pipesClientId, t.getId()); - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream - .builder() - .get(); - try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { - objectOutputStream.writeObject(t); - } - - byte[] bytes = bos.toByteArray(); + byte[] bytes = JsonPipesIpc.toBytes(t); serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte()); serverTuple.output.writeInt(bytes.length); serverTuple.output.write(bytes); @@ -312,7 +306,12 @@ public class PipesClient implements Closeable { lastUpdate = Instant.now(); break; case FINISHED: - return readResult(PipesResult.class); + PipesResult result = readResult(PipesResult.class); + // Restore ParseContext from original FetchEmitTuple (not serialized back from server) + if (result.emitData() instanceof EmitDataImpl emitDataImpl) { + emitDataImpl.setParseContext(t.getParseContext()); + } + return result; } } catch (SocketTimeoutException e) { LOG.warn("clientId={}: Socket timeout exception while waiting for server", pipesClientId, e); @@ -419,19 +418,12 @@ public class PipesClient implements Closeable { return status; } - private <T> T readResult(Class<? extends T> clazz) throws IOException { + private <T> T readResult(Class<T> clazz) throws IOException { int len = serverTuple.input.readInt(); byte[] bytes = new byte[len]; serverTuple.input.readFully(bytes); writeAck(); - try (ObjectInputStream objectInputStream = new ObjectInputStream(UnsynchronizedByteArrayInputStream - .builder() - .setByteArray(bytes) - .get())) { - return clazz.cast(objectInputStream.readObject()); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } + return JsonPipesIpc.fromBytes(bytes, clazz); } private void writeAck() throws IOException { @@ -441,7 +433,9 @@ public class PipesClient implements Closeable { private void restart() throws InterruptedException, IOException, TimeoutException { - ServerSocket serverSocket = new ServerSocket(0, 50, InetAddress.getLoopbackAddress()); + ServerSocket serverSocket = new ServerSocket(); + serverSocket.setReuseAddress(true); + serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 50); int port = serverSocket.getLocalPort(); if (serverTuple != null && serverTuple.process != null) { int oldPort = serverTuple.serverSocket.getLocalPort(); @@ -492,8 +486,10 @@ public class PipesClient implements Closeable { } } socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs()); - serverTuple = new ServerTuple(process, serverSocket, socket, new DataInputStream(socket.getInputStream()), - new DataOutputStream(socket.getOutputStream()), tmpDir); + socket.setTcpNoDelay(true); + serverTuple = new ServerTuple(process, serverSocket, socket, + new DataInputStream(new BufferedInputStream(socket.getInputStream())), + new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())), tmpDir); waitForStartup(); } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java index 1aee991f11..930d594918 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java @@ -16,7 +16,6 @@ */ package org.apache.tika.pipes.core.emitter; -import java.io.Serializable; import java.util.List; import org.apache.tika.metadata.Metadata; @@ -24,31 +23,23 @@ import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.emitter.EmitData; import org.apache.tika.utils.StringUtils; -public class EmitDataImpl implements Serializable, EmitData { - /** - * Serial version UID - */ - private static final long serialVersionUID = -3861669115439125268L; +public class EmitDataImpl implements EmitData { private final String emitKey; private final List<Metadata> metadataList; private final String containerStackTrace; - private ParseContext parseContext = null; + // ParseContext is not serialized - it's set by PipesClient after deserialization + private ParseContext parseContext; public EmitDataImpl(String emitKey, List<Metadata> metadataList) { this(emitKey, metadataList, StringUtils.EMPTY); } public EmitDataImpl(String emitKey, List<Metadata> metadataList, String containerStackTrace) { - this(emitKey, metadataList, containerStackTrace, new ParseContext()); - } - - public EmitDataImpl(String emitKey, List<Metadata> metadataList, String containerStackTrace, ParseContext parseContext) { this.emitKey = emitKey; this.metadataList = metadataList; this.containerStackTrace = (containerStackTrace == null) ? StringUtils.EMPTY : containerStackTrace; - this.parseContext = parseContext; } public String getEmitKey() { @@ -67,14 +58,23 @@ public class EmitDataImpl implements Serializable, EmitData { return estimateSizeInBytes(getEmitKey(), getMetadataList(), containerStackTrace); } - public void setParseContext(ParseContext parseContext) { - this.parseContext = parseContext; - } - + /** + * Gets the ParseContext. This is not serialized - it's set by PipesClient + * after deserialization from the original FetchEmitTuple. + */ + @Override public ParseContext getParseContext() { return parseContext; } + /** + * Sets the ParseContext. Called by PipesClient after deserialization + * to restore the ParseContext from the original FetchEmitTuple. + */ + public void setParseContext(ParseContext parseContext) { + this.parseContext = parseContext; + } + private static long estimateSizeInBytes(String id, List<Metadata> metadataList, String containerStackTrace) { long sz = 36 + id.length() * 2; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 66d5d9ae50..8ac17d1b27 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -22,12 +22,11 @@ import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.IN import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM; import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT; -import java.io.ByteArrayOutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -47,8 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; -import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; @@ -74,6 +71,7 @@ import org.apache.tika.pipes.core.config.ConfigStore; import org.apache.tika.pipes.core.config.ConfigStoreFactory; import org.apache.tika.pipes.core.emitter.EmitterManager; import org.apache.tika.pipes.core.fetcher.FetcherManager; +import org.apache.tika.pipes.core.serialization.JsonPipesIpc; import org.apache.tika.plugins.ExtensionConfig; import org.apache.tika.plugins.TikaPluginManager; import org.apache.tika.sax.ContentHandlerFactory; @@ -166,8 +164,8 @@ public class PipesServer implements AutoCloseable { Socket socket = new Socket(); socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), port), PipesClient.SOCKET_CONNECT_TIMEOUT_MS); - DataInputStream dis = new DataInputStream(socket.getInputStream()); - DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); + DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream())); + DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); try { TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath); TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig(); @@ -175,6 +173,7 @@ public class PipesServer implements AutoCloseable { // Set socket timeout from config after loading PipesConfig socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs()); + socket.setTcpNoDelay(true); MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters(); ContentHandlerFactory contentHandlerFactory = tikaLoader.loadContentHandlerFactory(); @@ -409,10 +408,9 @@ public class PipesServer implements AutoCloseable { private void handleCrash(PROCESSING_STATUS processingStatus, String id, Throwable t) { LOG.error("{}: {}", processingStatus, id, t); String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : ""; - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(msg); - write(processingStatus, bos.toByteArray()); + try { + byte[] bytes = JsonPipesIpc.toBytes(msg); + write(processingStatus, bytes); awaitAck(); } catch (IOException e) { //swallow @@ -443,18 +441,12 @@ public class PipesServer implements AutoCloseable { int length = input.readInt(); byte[] bytes = new byte[length]; input.readFully(bytes); - try (ObjectInputStream objectInputStream = new ObjectInputStream( - UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) { - return (FetchEmitTuple) objectInputStream.readObject(); - } + return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class); } catch (IOException e) { - LOG.error("problem reading tuple", e); - exit(1); - } catch (ClassNotFoundException e) { - LOG.error("can't find class?!", e); - exit(1); + LOG.error("problem reading/deserializing FetchEmitTuple", e); + handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e); } - //unreachable, no?! + //unreachable - handleCrash calls exit return null; } @@ -516,11 +508,8 @@ public class PipesServer implements AutoCloseable { private void write(PROCESSING_STATUS processingStatus, PipesResult pipesResult) { try { - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { - objectOutputStream.writeObject(pipesResult); - } - write(processingStatus, bos.toByteArray()); + byte[] bytes = JsonPipesIpc.toBytes(pipesResult); + write(processingStatus, bytes); } catch (IOException e) { LOG.error("problem writing emit data (forking process shutdown?)", e); exit(1); @@ -529,11 +518,8 @@ public class PipesServer implements AutoCloseable { private void writeIntermediate(Metadata metadata) { try { - UnsynchronizedByteArrayOutputStream bos = UnsynchronizedByteArrayOutputStream.builder().get(); - try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { - objectOutputStream.writeObject(metadata); - } - write(INTERMEDIATE_RESULT, bos.toByteArray()); + byte[] bytes = JsonPipesIpc.toBytes(metadata); + write(INTERMEDIATE_RESULT, bytes); } catch (IOException e) { LOG.error("problem writing intermediate data (forking process shutdown?)", e); exit(1); diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java index a3a55bb372..853136855d 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java @@ -23,14 +23,12 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.Locale; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.apache.tika.config.loader.TikaJsonConfig; -import org.apache.tika.exception.TikaException; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.parser.ParseContext; @@ -39,7 +37,6 @@ import org.apache.tika.pipes.api.PipesResult; import org.apache.tika.pipes.api.emitter.EmitKey; import org.apache.tika.pipes.api.fetcher.FetchKey; import org.apache.tika.serialization.JsonMetadataList; -import org.apache.tika.utils.StringUtils; public class PassbackFilterTest { @@ -63,7 +60,9 @@ public class PassbackFilterTest { init(tmpDir); String emitFileBase = "blah"; ParseContext parseContext = new ParseContext(); - parseContext.set(PassbackFilter.class, new MyPassbackFilter()); + // Use JSON config approach for Jackson serialization compatibility + // Don't resolve here - let PipesServer resolve on its side + parseContext.setJsonConfig("mock-passback-filter", "{}"); PipesResult pipesResult = pipesClient.process( new FetchEmitTuple(testPdfFile, new FetchKey(fetcherId, testPdfFile), new EmitKey(emitterId, emitFileBase), new Metadata(), parseContext, @@ -96,21 +95,4 @@ public class PassbackFilterTest { .get(0) .get(Metadata.CONTENT_LENGTH)); } - - private static class MyPassbackFilter extends PassbackFilter { - @Override - public void filter(List<Metadata> metadataList) throws TikaException { - // Remove items without RESOURCE_NAME_KEY and transform remaining ones - metadataList.removeIf(m -> StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY))); - for (Metadata m : metadataList) { - String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY); - // Clear all fields and only keep RESOURCE_NAME_KEY (uppercased) - for (String name : m.names()) { - m.remove(name); - } - m.set(TikaCoreProperties.RESOURCE_NAME_KEY, val.toUpperCase(Locale.ROOT)); - } - } - } - } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java index 1cba2622ac..ee18f7a369 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java +++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -32,10 +31,8 @@ import org.apache.tika.config.TikaTaskTimeout; import org.apache.tika.config.loader.TikaJsonConfig; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; -import org.apache.tika.metadata.filter.AttachmentCountingListFilter; import org.apache.tika.metadata.filter.CompositeMetadataFilter; import org.apache.tika.metadata.filter.MetadataFilter; -import org.apache.tika.metadata.filter.MockUpperCaseFilter; import org.apache.tika.parser.ParseContext; import org.apache.tika.pipes.api.FetchEmitTuple; import org.apache.tika.pipes.api.PipesResult; @@ -74,8 +71,11 @@ public class PipesClientTest { @Test public void testMetadataFilter(@TempDir Path tmp) throws Exception { ParseContext parseContext = new ParseContext(); - MetadataFilter metadataFilter = new CompositeMetadataFilter(List.of(new MockUpperCaseFilter())); - parseContext.set(MetadataFilter.class, metadataFilter); + // Use JSON config approach for Jackson serialization compatibility + // Don't resolve here - let PipesServer resolve on its side + parseContext.setJsonConfig("metadata-filters", """ + ["mock-upper-case-filter"] + """); PipesClient pipesClient = init(tmp, testDoc); PipesResult pipesResult = pipesClient.process( new FetchEmitTuple(testDoc, new FetchKey(fetcherName, testDoc), @@ -89,8 +89,11 @@ public class PipesClientTest { @Test public void testMetadataListFilter(@TempDir Path tmp) throws Exception { ParseContext parseContext = new ParseContext(); - MetadataFilter metadataFilter = new CompositeMetadataFilter(List.of(new AttachmentCountingListFilter())); - parseContext.set(MetadataFilter.class, metadataFilter); + // Use JSON config approach for Jackson serialization compatibility + // Don't resolve here - let PipesServer resolve on its side + parseContext.setJsonConfig("metadata-filters", """ + ["attachment-counting-list-filter"] + """); String testFile = "mock-embedded.xml"; @@ -175,8 +178,11 @@ public class PipesClientTest { //I did both manually during development, but unit tests are better. :D ParseContext parseContext = new ParseContext(); parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000)); - MetadataFilter metadataFilter = new CompositeMetadataFilter(List.of(new AttachmentCountingListFilter())); - parseContext.set(MetadataFilter.class, metadataFilter); + // Use JSON config approach for Jackson serialization compatibility + // Don't resolve here - let PipesServer resolve on its side + parseContext.setJsonConfig("metadata-filters", """ + ["attachment-counting-list-filter"] + """); String testFile = "mock-timeout-10s.xml"; PipesClient pipesClient = init(tmp, testFile); diff --git a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java index b1898d5914..86ea09edee 100644 --- a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java +++ b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java @@ -183,7 +183,11 @@ public class JDBCEmitter extends AbstractEmitter implements Closeable { @Override public void emit(List<? extends EmitData> emitData) throws IOException { for (EmitData d : emitData) { - emit(d.getEmitKey(), d.getMetadataList(), d.getParseContext()); + ParseContext parseContext = d.getParseContext(); + if (parseContext == null) { + parseContext = new ParseContext(); + } + emit(d.getEmitKey(), d.getMetadataList(), parseContext); } }
