This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 03f88a5a402313280b3852399633114041cc1a21 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Nov 30 10:42:45 2024 +0100 CAMEL-21494: camel-file/camel-ftp - Producer should be AsyncProducer based --- .../camel/component/file/GenericFileProducer.java | 21 ++++++++++++++++++--- .../component/file/remote/RemoteFileProducer.java | 2 +- .../file/remote/integration/FtpLoginIT.java | 4 ++++ .../file/remote/integration/FtpLoginNoRetryIT.java | 4 ++++ .../FtpThrowExceptionOnConnectionFailedIT.java | 4 ++++ 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java index 2f5031cb70f..22f3eff0c42 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java @@ -22,10 +22,11 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.LRUCacheFactory; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.service.ServiceHelper; @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory; /** * Generic file producer */ -public class GenericFileProducer<T> extends DefaultProducer { +public class GenericFileProducer<T> extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(GenericFileProducer.class); protected final GenericFileEndpoint<T> endpoint; @@ -63,7 +64,18 @@ public class GenericFileProducer<T> extends DefaultProducer { } @Override - public void process(Exchange exchange) throws Exception { + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + doProcess(exchange); + } catch (Exception e) { + exchange.setException(e); + } finally { + callback.done(true); + } + return true; + } + + protected void doProcess(Exchange exchange) throws Exception { // store any existing file header which we want to keep and propagate final String existing = exchange.getIn().getHeader(FileConstants.FILE_NAME, String.class); @@ -78,6 +90,8 @@ public class GenericFileProducer<T> extends DefaultProducer { lock.lock(); try { processExchange(exchange, target); + } catch (Exception e) { + exchange.setException(e); } finally { // do not remove as the locks cache has an upper bound // this ensure the locks is appropriate reused @@ -512,4 +526,5 @@ public class GenericFileProducer<T> extends DefaultProducer { super.doStop(); ServiceHelper.stopService(locks); } + } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java index 29c6269ee50..16e20a05957 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java @@ -50,7 +50,7 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> { } @Override - public void process(Exchange exchange) throws Exception { + protected void doProcess(Exchange exchange) throws Exception { // store any existing file header which we want to keep and propagate final String existing = exchange.getIn().getHeader(FtpConstants.FILE_NAME, String.class); diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java index cc97c89ec6b..87aca863193 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginIT.java @@ -69,5 +69,9 @@ public class FtpLoginIT extends FtpServerTestSupport { producer.start(); producer.process(exchange); producer.stop(); + + if (exchange.isFailed()) { + throw exchange.getException(); + } } } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java index 909c9761a5d..d967405e12d 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpLoginNoRetryIT.java @@ -60,5 +60,9 @@ public class FtpLoginNoRetryIT extends FtpServerTestSupport { producer.start(); producer.process(exchange); producer.stop(); + + if (exchange.isFailed()) { + throw exchange.getException(); + } } } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java index 68eeebff422..fcf9f1f8470 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpThrowExceptionOnConnectionFailedIT.java @@ -60,6 +60,10 @@ public class FtpThrowExceptionOnConnectionFailedIT extends FtpServerTestSupport producer.start(); producer.process(exchange); producer.stop(); + + if (exchange.isFailed()) { + throw exchange.getException(); + } } }