This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch phc in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5d185ba1f0de85b8bc658afdc46d8b521a15980f Author: Claus Ibsen <[email protected]> AuthorDate: Fri Dec 26 11:22:06 2025 +0100 CAMEL-20919: camel-ftp - Add producer health check --- components/camel-ftp/pom.xml | 4 + .../component/file/remote/RemoteFileEndpoint.java | 5 ++ .../component/file/remote/RemoteFileProducer.java | 36 +++++++-- .../file/remote/RemoteFileProducerHealthCheck.java | 64 +++++++++++++++ .../integration/FtpProducerHealthCheckIT.java | 92 ++++++++++++++++++++++ 5 files changed, 194 insertions(+), 7 deletions(-) diff --git a/components/camel-ftp/pom.xml b/components/camel-ftp/pom.xml index a3ace651d372..b5b6c8e54e00 100644 --- a/components/camel-ftp/pom.xml +++ b/components/camel-ftp/pom.xml @@ -40,6 +40,10 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-file</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-health</artifactId> + </dependency> <!-- needed for dynamic to --> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java index 2bc60da26adf..678012a536c2 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java @@ -93,6 +93,11 @@ public abstract class RemoteFileEndpoint<T> extends GenericFileEndpoint<T> imple setPollStrategy(new RemoteFilePollingConsumerPollStrategy()); } + @Override + public RemoteFileComponent getComponent() { + return (RemoteFileComponent) super.getComponent(); + } + @Override public boolean isSingletonProducer() { // this producer is stateful because the remote file operations is not diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java index 6b32ce298a56..2fa872448989 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java @@ -20,6 +20,8 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.component.file.GenericFileOperationFailedException; import org.apache.camel.component.file.GenericFileProducer; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,8 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> { private static final Logger LOG = LoggerFactory.getLogger(RemoteFileProducer.class); private boolean loggedIn; + private RemoteFileProducerHealthCheck producerHealthCheck; + private WritableHealthCheckRepository healthCheckRepository; private transient String remoteFileProducerToString; @@ -105,11 +109,14 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> { @Override public void preWriteCheck(Exchange exchange) throws Exception { - // before writing send a noop to see if the connection is alive and - // works + doPreWriteCheck(exchange, getEndpoint().getConfiguration().isSendNoop()); + } + + protected void doPreWriteCheck(Exchange exchange, boolean sendNoop) throws Exception { + // before writing send a noop to see if the connection is alive and works boolean noop = false; if (loggedIn) { - if (getEndpoint().getConfiguration().isSendNoop()) { + if (sendNoop) { try { noop = getOperations().sendNoop(); } catch (Exception e) { @@ -120,8 +127,7 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> { } LOG.trace("preWriteCheck send noop success: {}", noop); } else { - // okay send noop is disabled then we would regard the op as - // success + // okay send noop is disabled then we would regard the op as success noop = true; LOG.trace("preWriteCheck send noop disabled"); } @@ -162,14 +168,30 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> { @Override protected void doStart() throws Exception { LOG.debug("Starting"); + + // health-check is optional so discover and resolve + healthCheckRepository = HealthCheckHelper.getHealthCheckRepository( + endpoint.getCamelContext(), + "producers", + WritableHealthCheckRepository.class); + + if (healthCheckRepository != null) { + producerHealthCheck = new RemoteFileProducerHealthCheck(this); + producerHealthCheck.setEnabled(this.getEndpoint().getComponent().isHealthCheckProducerEnabled()); + healthCheckRepository.addHealthCheck(producerHealthCheck); + } + // do not connect when component starts, just wait until we process as - // we will - // connect at that time if needed + // we will connect at that time if needed super.doStart(); } @Override protected void doStop() throws Exception { + if (healthCheckRepository != null && producerHealthCheck != null) { + healthCheckRepository.removeHealthCheck(producerHealthCheck); + producerHealthCheck = null; + } try { disconnect(); } catch (Exception e) { diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java new file mode 100644 index 000000000000..d09bc3dc22f3 --- /dev/null +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducerHealthCheck.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.health.HealthCheckResultBuilder; +import org.apache.camel.impl.health.AbstractHealthCheck; + +/** + * FTP producer readiness health-check + */ +public class RemoteFileProducerHealthCheck extends AbstractHealthCheck { + + private final RemoteFileProducer<?> producer; + + public RemoteFileProducerHealthCheck(RemoteFileProducer<?> producer) { + super("camel", "producer:ftp-" + producer.getEndpoint().getConfiguration().getHost()); + this.producer = producer; + } + + @Override + protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { + Exchange dummy = producer.createExchange(); + Exception cause = null; + try { + producer.doPreWriteCheck(dummy, true); + } catch (Exception e) { + cause = e; + } + if (cause != null) { + builder.down(); + builder.message("FtpProducer is not ready"); + builder.detail("serviceUrl", producer.getEndpoint().getServiceUrl()); + builder.error(cause); + if (cause instanceof GenericFileOperationFailedException gfe) { + int code = gfe.getCode(); + String msg = gfe.getReason(); + if (code > 0 && msg != null) { + builder.detail("ftp.code", code); + builder.detail("ftp.reason", msg.trim()); + } + } + } else { + builder.up(); + } + } +} diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java new file mode 100644 index 000000000000..a67c8fb6cd03 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/integration/FtpProducerHealthCheckIT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote.integration; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.file.remote.FtpConstants; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; + +public class FtpProducerHealthCheckIT extends FtpServerTestSupport { + + private String getFtpUrl() { + return "ftp://admin@localhost:{{ftp.server.port}}/reply?password=admin"; + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + HealthCheckHelper.getHealthCheckRepository(context, "producers").setEnabled(true); + return context; + } + + @Test + public void testHealthCheck() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Bye World"); + mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_CODE, 226); + mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_STRING, "226 Transfer complete."); + + template.requestBodyAndHeader("direct:start", "Bye World", Exchange.FILE_NAME, "hello.txt"); + + MockEndpoint.assertIsSatisfied(context); + + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context); + boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP)); + Assertions.assertTrue(up, "readiness check"); + }); + + // stop FTP server + service.shutdown(); + + // health-check should then become down + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context); + Optional<HealthCheck.Result> hr = res.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN)).findFirst(); + Assertions.assertTrue(hr.isPresent()); + HealthCheck.Result r = hr.get(); + Assertions.assertEquals(HealthCheck.State.DOWN, r.getState()); + Assertions.assertEquals("FtpProducer is not ready", r.getMessage().get()); + Assertions.assertEquals(200, r.getDetails().get("ftp.code")); + Assertions.assertEquals("Connection refused", r.getDetails().get("ftp.reason")); + }); + + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to(getFtpUrl()).to("mock:result"); + } + }; + } + +}
