This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new f43333e2bf3 CAMEL-21830: camel-file - Using consumer template to consume a single file issue with idempotentEager (#17352) f43333e2bf3 is described below commit f43333e2bf3110bd7d6c924f4fe4e75a92feea66 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Mar 5 05:55:23 2025 +0000 CAMEL-21830: camel-file - Using consumer template to consume a single file issue with idempotentEager (#17352) --- .../camel/component/file/GenericFileConsumer.java | 17 +++++- .../FileConsumerTemplateIdempotentEagerTest.java | 60 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 7f87cb2af1d..b2273f7e55c 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -269,13 +269,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum while (exchanges.size() > limit) { // must remove last Exchange exchange = exchanges.removeLast(); - GenericFile<?> file = exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class); + GenericFile file = exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class); String key = file.getAbsoluteFilePath(); endpoint.getInProgressRepository().remove(key); + // if we added eager to idempotent then we need to remove this + if (endpoint.isIdempotentEager() && endpoint.getIdempotentRepository() != null) { + removeExcessiveIdempotentFile(file, null); + } releaseExchange(exchange, true); } } + void removeExcessiveIdempotentFile(GenericFile file, Exchange dynamic) { + String key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, () -> file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + if (key != null) { + endpoint.getIdempotentRepository().remove(key); + } + } + /** * Drain any in progress files as we are done with the files * diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java new file mode 100644 index 00000000000..f6eeff87770 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FileConsumerTemplateIdempotentEagerTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testConsumerTemplate() throws Exception { + context.start(); + + String uri = fileUri() + "?noop=true"; + + template.sendBodyAndHeader(uri, "one", Exchange.FILE_NAME, "1.txt"); + template.sendBodyAndHeader(uri, "two", Exchange.FILE_NAME, "2.txt"); + + Exchange e1 = consumer().receive(uri, 5000); + Assertions.assertNotNull(e1); + String b1 = e1.getMessage().getBody(String.class); + Assertions.assertTrue("one".equals(b1) || "two".equals(b1), "Should either be one or two, was: " + b1); + consumer().doneUoW(e1); + + FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class); + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) fe.getIdempotentRepository(); + Assertions.assertEquals(1, repo.getCacheSize()); + + Exchange e2 = consumer().receive(uri, 5000); + Assertions.assertNotNull(e2); + String b2 = e2.getMessage().getBody(String.class); + Assertions.assertTrue("one".equals(b2) || "two".equals(b2), "Should either be one or two, was: " + b2); + consumer().doneUoW(e2); + Assertions.assertEquals(2, repo.getCacheSize()); + + Assertions.assertNotEquals(b1, b2); + } +}