This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new 6b3fa4c734c CAMEL-21830: camel-file - Using consumer template to
consume a single file issue with idempotentEager (#17352)
6b3fa4c734c is described below
commit 6b3fa4c734cea44cc7f3f0dfeebcff9dea16aeab
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Mar 5 05:55:23 2025 +0000
CAMEL-21830: camel-file - Using consumer template to consume a single file
issue with idempotentEager (#17352)
---
.../camel/component/file/GenericFileConsumer.java | 17 +++++-
.../FileConsumerTemplateIdempotentEagerTest.java | 60 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 1 deletion(-)
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 9774e748fdd..6e111228e5c 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -267,13 +267,28 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
while (exchanges.size() > limit) {
// must remove last
Exchange exchange = exchanges.removeLast();
- GenericFile<?> file =
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class);
+ GenericFile file =
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class);
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
+ // if we added eager to idempotent then we need to remove this
+ if (endpoint.isIdempotentEager() &&
endpoint.getIdempotentRepository() != null) {
+ removeExcessiveIdempotentFile(file, null);
+ }
releaseExchange(exchange, true);
}
}
+ void removeExcessiveIdempotentFile(GenericFile file, Exchange dynamic) {
+ String key = file.getAbsoluteFilePath();
+ if (endpoint.getIdempotentKey() != null) {
+ Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic,
() -> file);
+ key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+ }
+ if (key != null) {
+ endpoint.getIdempotentRepository().remove(key);
+ }
+ }
+
/**
* Drain any in progress files as we are done with the files
*
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
new file mode 100644
index 00000000000..f6eeff87770
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import
org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FileConsumerTemplateIdempotentEagerTest extends
ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testConsumerTemplate() throws Exception {
+ context.start();
+
+ String uri = fileUri() + "?noop=true";
+
+ template.sendBodyAndHeader(uri, "one", Exchange.FILE_NAME, "1.txt");
+ template.sendBodyAndHeader(uri, "two", Exchange.FILE_NAME, "2.txt");
+
+ Exchange e1 = consumer().receive(uri, 5000);
+ Assertions.assertNotNull(e1);
+ String b1 = e1.getMessage().getBody(String.class);
+ Assertions.assertTrue("one".equals(b1) || "two".equals(b1), "Should
either be one or two, was: " + b1);
+ consumer().doneUoW(e1);
+
+ FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository)
fe.getIdempotentRepository();
+ Assertions.assertEquals(1, repo.getCacheSize());
+
+ Exchange e2 = consumer().receive(uri, 5000);
+ Assertions.assertNotNull(e2);
+ String b2 = e2.getMessage().getBody(String.class);
+ Assertions.assertTrue("one".equals(b2) || "two".equals(b2), "Should
either be one or two, was: " + b2);
+ consumer().doneUoW(e2);
+ Assertions.assertEquals(2, repo.getCacheSize());
+
+ Assertions.assertNotEquals(b1, b2);
+ }
+}