This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch branch_3x
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/branch_3x by this push:
     new 5a092123c8 basically cherrypick 
51feb980fc4289fc90ed8cb1d22a42d6c2729bd5
5a092123c8 is described below

commit 5a092123c8954e49ff2ebd3696ae6d6c2d724937
Author: tallison <[email protected]>
AuthorDate: Thu Mar 5 15:00:51 2026 -0500

    basically cherrypick 51feb980fc4289fc90ed8cb1d22a42d6c2729bd5
---
 .../apache/tika/pipes/async/AsyncProcessor.java    | 28 +++++++++++++++++-----
 1 file changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 3a6751f4ff..fe532e9dc3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -178,16 +178,32 @@ public class AsyncProcessor implements Closeable {
         return fetchEmitTuples.remainingCapacity();
     }
 
-    public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+    public boolean offer(FetchEmitTuple t, long offerMs)
             throws PipesException, InterruptedException {
         if (fetchEmitTuples == null) {
             throw new IllegalStateException("queue hasn't been initialized 
yet.");
-        } else if (isShuttingDown) {
-            throw new IllegalStateException(
-                    "Can't call offer after calling close() or " + 
"shutdownNow()");
         }
-        checkActive();
-        return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
+        long deadline = System.currentTimeMillis() + offerMs;
+        while (System.currentTimeMillis() < deadline) {
+            synchronized (this) {
+                if (isShuttingDown) {
+                    throw new IllegalStateException(
+                            "Can't call offer after calling close() or 
shutdownNow()");
+                }
+                checkActive();
+            }
+            // Try a short offer outside the synchronized block so 
checkActive()
+            // can still be called by other threads (e.g. the watcher).
+            long remaining = deadline - System.currentTimeMillis();
+            long pollMs = Math.min(remaining, 1000);
+            if (pollMs <= 0) {
+                return false;
+            }
+            if (fetchEmitTuples.offer(t, pollMs, TimeUnit.MILLISECONDS)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     public void finished() throws InterruptedException {

Reply via email to