This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4628
in repository https://gitbox.apache.org/repos/asf/tika.git

commit a81d64dcded0f8025dc4d37eb955c4084df8472c
Author: tallison <[email protected]>
AuthorDate: Thu Jan 22 15:31:39 2026 -0500

    TIKA-4628 -- improve pipesClient+pipesServer ipc: critical 
socket.setTcpNoDelay(true) and migrate to pure jackson serialization
---
 .../tika/pipes/api/emitter/AbstractEmitter.java    |  7 +++-
 .../pipes/api/emitter/AbstractStreamEmitter.java   |  7 +++-
 .../apache/tika/pipes/api/emitter/EmitData.java    |  6 ++-
 .../org/apache/tika/pipes/core/PipesClient.java    | 44 +++++++++-----------
 .../tika/pipes/core/emitter/EmitDataImpl.java      | 32 +++++++--------
 .../apache/tika/pipes/core/server/PipesServer.java | 48 ++++++++--------------
 .../apache/tika/pipes/core/PassbackFilterTest.java | 24 ++---------
 .../apache/tika/pipes/core/PipesClientTest.java    | 24 +++++++----
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       |  6 ++-
 9 files changed, 93 insertions(+), 105 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
index 324d16ace9..250b3df9d9 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.tika.parser.ParseContext;
 import org.apache.tika.plugins.AbstractTikaExtension;
 import org.apache.tika.plugins.ExtensionConfig;
 
@@ -31,7 +32,11 @@ public abstract class AbstractEmitter extends 
AbstractTikaExtension implements E
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData item : emitData) {
-            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+            ParseContext parseContext = item.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(item.getEmitKey(), item.getMetadataList(), parseContext);
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
index 892180d8c7..98727205f1 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
@@ -19,6 +19,7 @@ package org.apache.tika.pipes.api.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.tika.parser.ParseContext;
 import org.apache.tika.plugins.AbstractTikaExtension;
 import org.apache.tika.plugins.ExtensionConfig;
 
@@ -31,7 +32,11 @@ public abstract class AbstractStreamEmitter extends 
AbstractTikaExtension implem
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData item : emitData) {
-            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+            ParseContext parseContext = item.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(item.getEmitKey(), item.getMetadataList(), parseContext);
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
index 6af852a63b..ec6266ed23 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
@@ -30,6 +30,10 @@ public interface EmitData {
 
     long getEstimatedSizeBytes();
 
+    /**
+     * Gets the ParseContext. This is not serialized over IPC - it's restored
+     * by PipesClient after deserialization from the original FetchEmitTuple.
+     * May return null if not set.
+     */
     ParseContext getParseContext();
-
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index e79c35ccfe..800930aba9 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -24,13 +24,14 @@ import static 
org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -49,7 +50,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +62,7 @@ import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.pipes.core.server.IntermediateResult;
 import org.apache.tika.pipes.core.server.PipesServer;
 import org.apache.tika.utils.ExceptionUtils;
@@ -254,14 +255,7 @@ public class PipesClient implements Closeable {
 
     private void writeTask(FetchEmitTuple t) throws IOException {
         LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", 
pipesClientId, t.getId());
-        UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream
-                .builder()
-                .get();
-        try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-            objectOutputStream.writeObject(t);
-        }
-
-        byte[] bytes = bos.toByteArray();
+        byte[] bytes = JsonPipesIpc.toBytes(t);
         serverTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
         serverTuple.output.writeInt(bytes.length);
         serverTuple.output.write(bytes);
@@ -312,7 +306,12 @@ public class PipesClient implements Closeable {
                         lastUpdate = Instant.now();
                         break;
                     case FINISHED:
-                        return readResult(PipesResult.class);
+                        PipesResult result = readResult(PipesResult.class);
+                        // Restore ParseContext from original FetchEmitTuple 
(not serialized back from server)
+                        if (result.emitData() instanceof EmitDataImpl 
emitDataImpl) {
+                            emitDataImpl.setParseContext(t.getParseContext());
+                        }
+                        return result;
                 }
             } catch (SocketTimeoutException e) {
                 LOG.warn("clientId={}: Socket timeout exception while waiting 
for server", pipesClientId, e);
@@ -419,19 +418,12 @@ public class PipesClient implements Closeable {
         return status;
     }
 
-    private <T> T readResult(Class<? extends T> clazz) throws IOException {
+    private <T> T readResult(Class<T> clazz) throws IOException {
         int len = serverTuple.input.readInt();
         byte[] bytes = new byte[len];
         serverTuple.input.readFully(bytes);
         writeAck();
-        try (ObjectInputStream objectInputStream = new 
ObjectInputStream(UnsynchronizedByteArrayInputStream
-                .builder()
-                .setByteArray(bytes)
-                .get())) {
-            return clazz.cast(objectInputStream.readObject());
-        } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
+        return JsonPipesIpc.fromBytes(bytes, clazz);
     }
 
     private void writeAck() throws IOException {
@@ -441,7 +433,9 @@ public class PipesClient implements Closeable {
 
 
     private void restart() throws InterruptedException, IOException, 
TimeoutException {
-        ServerSocket serverSocket = new ServerSocket(0, 50, 
InetAddress.getLoopbackAddress());
+        ServerSocket serverSocket = new ServerSocket();
+        serverSocket.setReuseAddress(true);
+        serverSocket.bind(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 50);
         int port = serverSocket.getLocalPort();
         if (serverTuple != null && serverTuple.process != null) {
             int oldPort = serverTuple.serverSocket.getLocalPort();
@@ -492,8 +486,10 @@ public class PipesClient implements Closeable {
             }
         }
         socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
-        serverTuple = new ServerTuple(process, serverSocket, socket, new 
DataInputStream(socket.getInputStream()),
-                new DataOutputStream(socket.getOutputStream()), tmpDir);
+        socket.setTcpNoDelay(true);
+        serverTuple = new ServerTuple(process, serverSocket, socket,
+                new DataInputStream(new 
BufferedInputStream(socket.getInputStream())),
+                new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream())), tmpDir);
         waitForStartup();
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
index 1aee991f11..930d594918 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.tika.pipes.core.emitter;
 
-import java.io.Serializable;
 import java.util.List;
 
 import org.apache.tika.metadata.Metadata;
@@ -24,31 +23,23 @@ import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.utils.StringUtils;
 
-public class EmitDataImpl implements Serializable, EmitData {
-    /**
-     * Serial version UID
-     */
-    private static final long serialVersionUID = -3861669115439125268L;
+public class EmitDataImpl implements EmitData {
 
     private final String emitKey;
     private final List<Metadata> metadataList;
     private final String containerStackTrace;
-    private ParseContext parseContext = null;
+    // ParseContext is not serialized - it's set by PipesClient after 
deserialization
+    private ParseContext parseContext;
 
     public EmitDataImpl(String emitKey, List<Metadata> metadataList) {
         this(emitKey, metadataList, StringUtils.EMPTY);
     }
 
     public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace) {
-        this(emitKey, metadataList, containerStackTrace, new ParseContext());
-    }
-
-    public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace, ParseContext parseContext) {
         this.emitKey = emitKey;
         this.metadataList = metadataList;
         this.containerStackTrace = (containerStackTrace == null) ? 
StringUtils.EMPTY :
                 containerStackTrace;
-        this.parseContext = parseContext;
     }
 
     public String getEmitKey() {
@@ -67,14 +58,23 @@ public class EmitDataImpl implements Serializable, EmitData 
{
         return estimateSizeInBytes(getEmitKey(), getMetadataList(), 
containerStackTrace);
     }
 
-    public void setParseContext(ParseContext parseContext) {
-        this.parseContext = parseContext;
-    }
-
+    /**
+     * Gets the ParseContext. This is not serialized - it's set by PipesClient
+     * after deserialization from the original FetchEmitTuple.
+     */
+    @Override
     public ParseContext getParseContext() {
         return parseContext;
     }
 
+    /**
+     * Sets the ParseContext. Called by PipesClient after deserialization
+     * to restore the ParseContext from the original FetchEmitTuple.
+     */
+    public void setParseContext(ParseContext parseContext) {
+        this.parseContext = parseContext;
+    }
+
     private static long estimateSizeInBytes(String id, List<Metadata> 
metadataList,
                                             String containerStackTrace) {
         long sz = 36 + id.length() * 2;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 66d5d9ae50..8ac17d1b27 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -22,12 +22,11 @@ import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.IN
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
 import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -47,8 +46,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
-import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -74,6 +71,7 @@ import org.apache.tika.pipes.core.config.ConfigStore;
 import org.apache.tika.pipes.core.config.ConfigStoreFactory;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.sax.ContentHandlerFactory;
@@ -166,8 +164,8 @@ public class PipesServer implements AutoCloseable {
             Socket socket = new Socket();
             socket.connect(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), port), 
PipesClient.SOCKET_CONNECT_TIMEOUT_MS);
 
-            DataInputStream dis = new DataInputStream(socket.getInputStream());
-            DataOutputStream dos = new 
DataOutputStream(socket.getOutputStream());
+            DataInputStream dis = new DataInputStream(new 
BufferedInputStream(socket.getInputStream()));
+            DataOutputStream dos = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
         try {
             TikaLoader tikaLoader = TikaLoader.load(tikaConfigPath);
             TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -175,6 +173,7 @@ public class PipesServer implements AutoCloseable {
 
             // Set socket timeout from config after loading PipesConfig
             socket.setSoTimeout((int) pipesConfig.getSocketTimeoutMs());
+            socket.setTcpNoDelay(true);
 
             MetadataFilter metadataFilter = tikaLoader.loadMetadataFilters();
             ContentHandlerFactory contentHandlerFactory = 
tikaLoader.loadContentHandlerFactory();
@@ -409,10 +408,9 @@ public class PipesServer implements AutoCloseable {
     private void handleCrash(PROCESSING_STATUS processingStatus, String id, 
Throwable t) {
         LOG.error("{}: {}", processingStatus, id, t);
         String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-            oos.writeObject(msg);
-            write(processingStatus, bos.toByteArray());
+        try {
+            byte[] bytes = JsonPipesIpc.toBytes(msg);
+            write(processingStatus, bytes);
             awaitAck();
         } catch (IOException e) {
             //swallow
@@ -443,18 +441,12 @@ public class PipesServer implements AutoCloseable {
             int length = input.readInt();
             byte[] bytes = new byte[length];
             input.readFully(bytes);
-            try (ObjectInputStream objectInputStream = new ObjectInputStream(
-                    
UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) {
-                return (FetchEmitTuple) objectInputStream.readObject();
-            }
+            return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
         } catch (IOException e) {
-            LOG.error("problem reading tuple", e);
-            exit(1);
-        } catch (ClassNotFoundException e) {
-            LOG.error("can't find class?!", e);
-            exit(1);
+            LOG.error("problem reading/deserializing FetchEmitTuple", e);
+            handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e);
         }
-        //unreachable, no?!
+        //unreachable - handleCrash calls exit
         return null;
     }
 
@@ -516,11 +508,8 @@ public class PipesServer implements AutoCloseable {
 
     private void write(PROCESSING_STATUS processingStatus, PipesResult 
pipesResult) {
         try {
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-                objectOutputStream.writeObject(pipesResult);
-            }
-            write(processingStatus, bos.toByteArray());
+            byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
+            write(processingStatus, bytes);
         } catch (IOException e) {
             LOG.error("problem writing emit data (forking process shutdown?)", 
e);
             exit(1);
@@ -529,11 +518,8 @@ public class PipesServer implements AutoCloseable {
 
     private void writeIntermediate(Metadata metadata) {
         try {
-            UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
-            try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
-                objectOutputStream.writeObject(metadata);
-            }
-            write(INTERMEDIATE_RESULT, bos.toByteArray());
+            byte[] bytes = JsonPipesIpc.toBytes(metadata);
+            write(INTERMEDIATE_RESULT, bytes);
         } catch (IOException e) {
             LOG.error("problem writing intermediate data (forking process 
shutdown?)", e);
             exit(1);
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
index a3a55bb372..853136855d 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -23,14 +23,12 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Locale;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.config.loader.TikaJsonConfig;
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
@@ -39,7 +37,6 @@ import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.api.fetcher.FetchKey;
 import org.apache.tika.serialization.JsonMetadataList;
-import org.apache.tika.utils.StringUtils;
 
 public class PassbackFilterTest {
 
@@ -63,7 +60,9 @@ public class PassbackFilterTest {
         init(tmpDir);
         String emitFileBase = "blah";
         ParseContext parseContext = new ParseContext();
-        parseContext.set(PassbackFilter.class, new MyPassbackFilter());
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("mock-passback-filter", "{}");
         PipesResult pipesResult = pipesClient.process(
                 new FetchEmitTuple(testPdfFile, new FetchKey(fetcherId, 
testPdfFile),
                         new EmitKey(emitterId, emitFileBase), new Metadata(), 
parseContext,
@@ -96,21 +95,4 @@ public class PassbackFilterTest {
                 .get(0)
                 .get(Metadata.CONTENT_LENGTH));
     }
-
-    private static class MyPassbackFilter extends PassbackFilter {
-        @Override
-        public void filter(List<Metadata> metadataList) throws TikaException {
-            // Remove items without RESOURCE_NAME_KEY and transform remaining 
ones
-            metadataList.removeIf(m -> 
StringUtils.isBlank(m.get(TikaCoreProperties.RESOURCE_NAME_KEY)));
-            for (Metadata m : metadataList) {
-                String val = m.get(TikaCoreProperties.RESOURCE_NAME_KEY);
-                // Clear all fields and only keep RESOURCE_NAME_KEY 
(uppercased)
-                for (String name : m.names()) {
-                    m.remove(name);
-                }
-                m.set(TikaCoreProperties.RESOURCE_NAME_KEY, 
val.toUpperCase(Locale.ROOT));
-            }
-        }
-    }
-
 }
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 1cba2622ac..ee18f7a369 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.List;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -32,10 +31,8 @@ import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.config.loader.TikaJsonConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.filter.AttachmentCountingListFilter;
 import org.apache.tika.metadata.filter.CompositeMetadataFilter;
 import org.apache.tika.metadata.filter.MetadataFilter;
-import org.apache.tika.metadata.filter.MockUpperCaseFilter;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
@@ -74,8 +71,11 @@ public class PipesClientTest {
     @Test
     public void testMetadataFilter(@TempDir Path tmp) throws Exception {
         ParseContext parseContext = new ParseContext();
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new MockUpperCaseFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["mock-upper-case-filter"]
+        """);
         PipesClient pipesClient = init(tmp, testDoc);
         PipesResult pipesResult = pipesClient.process(
                 new FetchEmitTuple(testDoc, new FetchKey(fetcherName, testDoc),
@@ -89,8 +89,11 @@ public class PipesClientTest {
     @Test
     public void testMetadataListFilter(@TempDir Path tmp) throws Exception {
         ParseContext parseContext = new ParseContext();
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["attachment-counting-list-filter"]
+        """);
 
         String testFile = "mock-embedded.xml";
 
@@ -175,8 +178,11 @@ public class PipesClientTest {
         //I did both manually during development, but unit tests are better. :D
         ParseContext parseContext = new ParseContext();
         parseContext.set(TikaTaskTimeout.class, new TikaTaskTimeout(1000));
-        MetadataFilter metadataFilter = new 
CompositeMetadataFilter(List.of(new AttachmentCountingListFilter()));
-        parseContext.set(MetadataFilter.class, metadataFilter);
+        // Use JSON config approach for Jackson serialization compatibility
+        // Don't resolve here - let PipesServer resolve on its side
+        parseContext.setJsonConfig("metadata-filters", """
+            ["attachment-counting-list-filter"]
+        """);
 
         String testFile = "mock-timeout-10s.xml";
         PipesClient pipesClient = init(tmp, testFile);
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index b1898d5914..86ea09edee 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -183,7 +183,11 @@ public class JDBCEmitter extends AbstractEmitter 
implements Closeable {
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException {
         for (EmitData d : emitData) {
-            emit(d.getEmitKey(), d.getMetadataList(), d.getParseContext());
+            ParseContext parseContext = d.getParseContext();
+            if (parseContext == null) {
+                parseContext = new ParseContext();
+            }
+            emit(d.getEmitKey(), d.getMetadataList(), parseContext);
         }
     }
 

Reply via email to