This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 27513425c505 CAMEL-20919: camel-ftp - Add producer health check
(#20604)
27513425c505 is described below
commit 27513425c50528dc3be2d8d7e1c85f1b82fcf5d8
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Dec 26 14:47:37 2025 +0100
CAMEL-20919: camel-ftp - Add producer health check (#20604)
* CAMEL-20919: camel-ftp - Add producer health check
---
components/camel-ftp/pom.xml | 4 +
.../component/file/remote/RemoteFileEndpoint.java | 5 ++
.../component/file/remote/RemoteFileProducer.java | 36 +++++++--
.../file/remote/RemoteFileProducerHealthCheck.java | 64 +++++++++++++++
.../integration/FtpProducerHealthCheckIT.java | 92 ++++++++++++++++++++++
.../itest/ftp/SpringFtpEndpointTest-context.xml | 6 ++
6 files changed, 200 insertions(+), 7 deletions(-)
diff --git a/components/camel-ftp/pom.xml b/components/camel-ftp/pom.xml
index a3ace651d372..b5b6c8e54e00 100644
--- a/components/camel-ftp/pom.xml
+++ b/components/camel-ftp/pom.xml
@@ -40,6 +40,10 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-file</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-health</artifactId>
+ </dependency>
<!-- needed for dynamic to -->
<dependency>
<groupId>org.apache.camel</groupId>
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
index 2bc60da26adf..678012a536c2 100644
---
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
@@ -93,6 +93,11 @@ public abstract class RemoteFileEndpoint<T> extends
GenericFileEndpoint<T> imple
setPollStrategy(new RemoteFilePollingConsumerPollStrategy());
}
+ @Override
+ public RemoteFileComponent getComponent() {
+ return (RemoteFileComponent) super.getComponent();
+ }
+
@Override
public boolean isSingletonProducer() {
// this producer is stateful because the remote file operations is not
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
index 6b32ce298a56..2fa872448989 100644
---
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
@@ -20,6 +20,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileProducer;
+import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,8 @@ public class RemoteFileProducer<T> extends
GenericFileProducer<T> {
private static final Logger LOG =
LoggerFactory.getLogger(RemoteFileProducer.class);
private boolean loggedIn;
+ private RemoteFileProducerHealthCheck producerHealthCheck;
+ private WritableHealthCheckRepository healthCheckRepository;
private transient String remoteFileProducerToString;
@@ -105,11 +109,14 @@ public class RemoteFileProducer<T> extends
GenericFileProducer<T> {
@Override
public void preWriteCheck(Exchange exchange) throws Exception {
- // before writing send a noop to see if the connection is alive and
- // works
+ doPreWriteCheck(exchange,
getEndpoint().getConfiguration().isSendNoop());
+ }
+
+ protected void doPreWriteCheck(Exchange exchange, boolean sendNoop) throws
Exception {
+ // before writing send a noop to see if the connection is alive and
works
boolean noop = false;
if (loggedIn) {
- if (getEndpoint().getConfiguration().isSendNoop()) {
+ if (sendNoop) {
try {
noop = getOperations().sendNoop();
} catch (Exception e) {
@@ -120,8 +127,7 @@ public class RemoteFileProducer<T> extends
GenericFileProducer<T> {
}
LOG.trace("preWriteCheck send noop success: {}", noop);
} else {
- // okay send noop is disabled then we would regard the op as
- // success
+ // okay send noop is disabled then we would regard the op as
success
noop = true;
LOG.trace("preWriteCheck send noop disabled");
}
@@ -162,14 +168,30 @@ public class RemoteFileProducer<T> extends
GenericFileProducer<T> {
@Override
protected void doStart() throws Exception {
LOG.debug("Starting");
+
+ // health-check is optional so discover and resolve
+ healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
+ endpoint.getCamelContext(),
+ "producers",
+ WritableHealthCheckRepository.class);
+
+ if (healthCheckRepository != null) {
+ producerHealthCheck = new RemoteFileProducerHealthCheck(this);
+
producerHealthCheck.setEnabled(this.getEndpoint().getComponent().isHealthCheckProducerEnabled());
+ healthCheckRepository.addHealthCheck(producerHealthCheck);
+ }
+
// do not connect when component starts, just wait until we process as
- // we will
- // connect at that time if needed
+ // we will connect at that time if needed
super.doStart();
}
@Override
protected void doStop() throws Exception {
+ if (healthCheckRepository != null && producerHealthCheck != null) {
+ healthCheckRepository.removeHealthCheck(producerHealthCheck);
+ producerHealthCheck = null;
+ }
try {
disconnect();
} catch (Exception e) {
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java
new file mode 100644
index 000000000000..d09bc3dc22f3
--- /dev/null
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.health.HealthCheckResultBuilder;
+import org.apache.camel.impl.health.AbstractHealthCheck;
+
+/**
+ * FTP producer readiness health-check
+ */
+public class RemoteFileProducerHealthCheck extends AbstractHealthCheck {
+
+ private final RemoteFileProducer<?> producer;
+
+ public RemoteFileProducerHealthCheck(RemoteFileProducer<?> producer) {
+ super("camel", "producer:ftp-" +
producer.getEndpoint().getConfiguration().getHost());
+ this.producer = producer;
+ }
+
+ @Override
+ protected void doCall(HealthCheckResultBuilder builder, Map<String,
Object> options) {
+ Exchange dummy = producer.createExchange();
+ Exception cause = null;
+ try {
+ producer.doPreWriteCheck(dummy, true);
+ } catch (Exception e) {
+ cause = e;
+ }
+ if (cause != null) {
+ builder.down();
+ builder.message("FtpProducer is not ready");
+ builder.detail("serviceUrl",
producer.getEndpoint().getServiceUrl());
+ builder.error(cause);
+ if (cause instanceof GenericFileOperationFailedException gfe) {
+ int code = gfe.getCode();
+ String msg = gfe.getReason();
+ if (code > 0 && msg != null) {
+ builder.detail("ftp.code", code);
+ builder.detail("ftp.reason", msg.trim());
+ }
+ }
+ } else {
+ builder.up();
+ }
+ }
+}
diff --git
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java
new file mode 100644
index 000000000000..a67c8fb6cd03
--- /dev/null
+++
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.remote.FtpConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class FtpProducerHealthCheckIT extends FtpServerTestSupport {
+
+ private String getFtpUrl() {
+ return
"ftp://admin@localhost:{{ftp.server.port}}/reply?password=admin";
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ HealthCheckHelper.getHealthCheckRepository(context,
"producers").setEnabled(true);
+ return context;
+ }
+
+ @Test
+ public void testHealthCheck() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Bye World");
+ mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_CODE, 226);
+ mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_STRING, "226
Transfer complete.");
+
+ template.requestBodyAndHeader("direct:start", "Bye World",
Exchange.FILE_NAME, "hello.txt");
+
+ MockEndpoint.assertIsSatisfied(context);
+
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res =
HealthCheckHelper.invokeReadiness(context);
+ boolean up = res.stream().allMatch(r ->
r.getState().equals(HealthCheck.State.UP));
+ Assertions.assertTrue(up, "readiness check");
+ });
+
+ // stop FTP server
+ service.shutdown();
+
+ // health-check should then become down
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res =
HealthCheckHelper.invokeReadiness(context);
+ Optional<HealthCheck.Result> hr = res.stream().filter(r ->
r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+ Assertions.assertTrue(hr.isPresent());
+ HealthCheck.Result r = hr.get();
+ Assertions.assertEquals(HealthCheck.State.DOWN, r.getState());
+ Assertions.assertEquals("FtpProducer is not ready",
r.getMessage().get());
+ Assertions.assertEquals(200, r.getDetails().get("ftp.code"));
+ Assertions.assertEquals("Connection refused",
r.getDetails().get("ftp.reason"));
+ });
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to(getFtpUrl()).to("mock:result");
+ }
+ };
+ }
+
+}
diff --git
a/tests/camel-itest/src/test/resources/org/apache/camel/itest/ftp/SpringFtpEndpointTest-context.xml
b/tests/camel-itest/src/test/resources/org/apache/camel/itest/ftp/SpringFtpEndpointTest-context.xml
index 7cd19612cf2b..9bd51aba1d4f 100644
---
a/tests/camel-itest/src/test/resources/org/apache/camel/itest/ftp/SpringFtpEndpointTest-context.xml
+++
b/tests/camel-itest/src/test/resources/org/apache/camel/itest/ftp/SpringFtpEndpointTest-context.xml
@@ -26,7 +26,13 @@
<!-- START SNIPPET: e1 -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <bean id="myFTPComponent"
class="org.apache.camel.component.file.remote.FtpComponent">
+ <property name="camelContext" ref="camel"/>
+ </bean>
+
<bean id="myFTPEndpoint"
class="org.apache.camel.component.file.remote.FtpEndpoint">
+ <property name="component" ref="myFTPComponent"/>
<property name="camelContext" ref="camel"/>
<property name="configuration" ref="ftpConfig"/>
<property name="initialDelay" value="1000"/>