This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch eager in repository https://gitbox.apache.org/repos/asf/camel.git
commit 803cead2045e6494bfbf5d37f536dffad75b3b28 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Apr 11 13:41:18 2025 +0200 CAMEL-21947: camel-file/camel-ftp: If read lock is not acquired then file is not removed from idempotent repo if eager=true. WIP patch. --- .../java/org/apache/camel/component/file/GenericFileConsumer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index d4bece9f506..89b676b33a8 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; @@ -227,6 +228,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum total = maxMessagesPerPoll; } + Queue<Object> notStarted = new ArrayDeque<>(); + for (int index = 0; index < total && isBatchAllowed(); index++) { // only loop if we are started (allowed to run) // use poll to remove the head so it does not consume memory even @@ -253,10 +256,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // if we did not start process the file then decrement the counter if (!started) { answer--; + // this exchange was not started processing so remember to release it afterward + notStarted.add(exchange); } } // drain any in progress files as we are done with this batch + removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) notStarted, Exchange.class), 0); removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); return answer;