This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 03f88a5a402313280b3852399633114041cc1a21
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Nov 30 10:42:45 2024 +0100

    CAMEL-21494: camel-file/camel-ftp - Producer should be AsyncProducer based
---
 .../camel/component/file/GenericFileProducer.java   | 21 ++++++++++++++++++---
 .../component/file/remote/RemoteFileProducer.java   |  2 +-
 .../file/remote/integration/FtpLoginIT.java         |  4 ++++
 .../file/remote/integration/FtpLoginNoRetryIT.java  |  4 ++++
 .../FtpThrowExceptionOnConnectionFailedIT.java      |  4 ++++
 5 files changed, 31 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
index 2f5031cb70f..22f3eff0c42 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
@@ -22,10 +22,11 @@ import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.service.ServiceHelper;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Generic file producer
  */
-public class GenericFileProducer<T> extends DefaultProducer {
+public class GenericFileProducer<T> extends DefaultAsyncProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericFileProducer.class);
 
     protected final GenericFileEndpoint<T> endpoint;
@@ -63,7 +64,18 @@ public class GenericFileProducer<T> extends DefaultProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            doProcess(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        } finally {
+            callback.done(true);
+        }
+        return true;
+    }
+
+    protected void doProcess(Exchange exchange) throws Exception {
         // store any existing file header which we want to keep and propagate
         final String existing = 
exchange.getIn().getHeader(FileConstants.FILE_NAME, String.class);
 
@@ -78,6 +90,8 @@ public class GenericFileProducer<T> extends DefaultProducer {
         lock.lock();
         try {
             processExchange(exchange, target);
+        } catch (Exception e) {
+            exchange.setException(e);
         } finally {
             // do not remove as the locks cache has an upper bound
             // this ensure the locks is appropriate reused
@@ -512,4 +526,5 @@ public class GenericFileProducer<T> extends DefaultProducer 
{
         super.doStop();
         ServiceHelper.stopService(locks);
     }
+
 }
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
index 29c6269ee50..16e20a05957 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
@@ -50,7 +50,7 @@ public class RemoteFileProducer<T> extends 
GenericFileProducer<T> {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    protected void doProcess(Exchange exchange) throws Exception {
         // store any existing file header which we want to keep and propagate
         final String existing = 
exchange.getIn().getHeader(FtpConstants.FILE_NAME, String.class);
 
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java
index cc97c89ec6b..87aca863193 100644
--- 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java
@@ -69,5 +69,9 @@ public class FtpLoginIT extends FtpServerTestSupport {
         producer.start();
         producer.process(exchange);
         producer.stop();
+
+        if (exchange.isFailed()) {
+            throw exchange.getException();
+        }
     }
 }
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java
index 909c9761a5d..d967405e12d 100644
--- 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java
@@ -60,5 +60,9 @@ public class FtpLoginNoRetryIT extends FtpServerTestSupport {
         producer.start();
         producer.process(exchange);
         producer.stop();
+
+        if (exchange.isFailed()) {
+            throw exchange.getException();
+        }
     }
 }
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java
index 68eeebff422..fcf9f1f8470 100644
--- 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java
@@ -60,6 +60,10 @@ public class FtpThrowExceptionOnConnectionFailedIT extends 
FtpServerTestSupport
         producer.start();
         producer.process(exchange);
         producer.stop();
+
+        if (exchange.isFailed()) {
+            throw exchange.getException();
+        }
     }
 
 }

Reply via email to