This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new 6b3fa4c734c CAMEL-21830: camel-file - Using consumer template to 
consume a single file issue with idempotentEager (#17352)
6b3fa4c734c is described below

commit 6b3fa4c734cea44cc7f3f0dfeebcff9dea16aeab
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Mar 5 05:55:23 2025 +0000

    CAMEL-21830: camel-file - Using consumer template to consume a single file 
issue with idempotentEager (#17352)
---
 .../camel/component/file/GenericFileConsumer.java  | 17 +++++-
 .../FileConsumerTemplateIdempotentEagerTest.java   | 60 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 9774e748fdd..6e111228e5c 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -267,13 +267,28 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         while (exchanges.size() > limit) {
             // must remove last
             Exchange exchange = exchanges.removeLast();
-            GenericFile<?> file = 
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class);
+            GenericFile file = 
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE, GenericFile.class);
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
+            // if we added eager to idempotent then we need to remove this
+            if (endpoint.isIdempotentEager() && 
endpoint.getIdempotentRepository() != null) {
+                removeExcessiveIdempotentFile(file, null);
+            }
             releaseExchange(exchange, true);
         }
     }
 
+    void removeExcessiveIdempotentFile(GenericFile file, Exchange dynamic) {
+        String key = file.getAbsoluteFilePath();
+        if (endpoint.getIdempotentKey() != null) {
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, 
() -> file);
+            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
+        }
+        if (key != null) {
+            endpoint.getIdempotentRepository().remove(key);
+        }
+    }
+
     /**
      * Drain any in progress files as we are done with the files
      *
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
new file mode 100644
index 00000000000..f6eeff87770
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerTemplateIdempotentEagerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import 
org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FileConsumerTemplateIdempotentEagerTest extends 
ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testConsumerTemplate() throws Exception {
+        context.start();
+
+        String uri = fileUri() + "?noop=true";
+
+        template.sendBodyAndHeader(uri, "one", Exchange.FILE_NAME, "1.txt");
+        template.sendBodyAndHeader(uri, "two", Exchange.FILE_NAME, "2.txt");
+
+        Exchange e1 = consumer().receive(uri, 5000);
+        Assertions.assertNotNull(e1);
+        String b1 = e1.getMessage().getBody(String.class);
+        Assertions.assertTrue("one".equals(b1) || "two".equals(b1), "Should 
either be one or two, was: " + b1);
+        consumer().doneUoW(e1);
+
+        FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) 
fe.getIdempotentRepository();
+        Assertions.assertEquals(1, repo.getCacheSize());
+
+        Exchange e2 = consumer().receive(uri, 5000);
+        Assertions.assertNotNull(e2);
+        String b2 = e2.getMessage().getBody(String.class);
+        Assertions.assertTrue("one".equals(b2) || "two".equals(b2), "Should 
either be one or two, was: " + b2);
+        consumer().doneUoW(e2);
+        Assertions.assertEquals(2, repo.getCacheSize());
+
+        Assertions.assertNotEquals(b1, b2);
+    }
+}

Reply via email to