This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push: new aae93b5 Add reconnect during FTPOperation delete and rename aae93b5 is described below commit aae93b567d160b8e33c496d3f4781f0015d7068d Author: Lukas Holthof <lukas.holt...@sap.com> AuthorDate: Wed Jul 15 22:09:50 2020 +0200 Add reconnect during FTPOperation delete and rename Change-Id: Ie97fee70f26792793166c491755665153bc3e746 --- .../camel/component/file/remote/FtpOperations.java | 19 +++- .../FtpConsumerPostProcessingOnDisconnect.java | 117 +++++++++++++++++++++ .../file/remote/FtpServerTestSupport.java | 17 +++ 3 files changed, 152 insertions(+), 1 deletion(-) diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java index 5837418..0a83906 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java @@ -303,12 +303,13 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { @Override public boolean deleteFile(String name) throws GenericFileOperationFailedException { log.debug("Deleting file: {}", name); - + boolean result; String target = name; String currentDir = null; try { + reconnectIfNecessary(null); if (endpoint.getConfiguration().isStepwise()) { // remember current directory currentDir = getCurrentDirectory(); @@ -343,6 +344,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { public boolean renameFile(String from, String to) throws GenericFileOperationFailedException { log.debug("Renaming file: {} to: {}", from, to); try { + reconnectIfNecessary(null); return client.rename(from, to); } catch (IOException e) { throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e); @@ -986,5 +988,20 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { public FTPClient getClient() { return client; } + + private void reconnectIfNecessary(Exchange exchange) throws GenericFileOperationFailedException { + if (isConnected()) { + log.trace("sendNoOp to check if connection should be reconnected"); + try { + client.sendNoOp(); + } catch (IOException e) { + log.trace("NoOp to server failed, try to reconnect"); + connect(endpoint.getConfiguration(), exchange); + } + } else { + log.trace("Client is not connected, try to reconnect"); + connect(endpoint.getConfiguration(), exchange); + } + } } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java new file mode 100644 index 0000000..8bff7dc --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerPostProcessingOnDisconnect.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; +import java.io.IOException; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class FtpConsumerPostProcessingOnDisconnect extends FtpServerTestSupport { + private static final String SAMPLE_FILE_NAME_1 = String.format("sample-1-%s.txt", + FtpConsumerPostProcessingOnDisconnect.class.getSimpleName()); + private static final String SAMPLE_FILE_NAME_2 = String.format("sample-2-%s.txt", + FtpConsumerPostProcessingOnDisconnect.class.getSimpleName()); + private static final String SAMPLE_FILE_CHARSET = "iso-8859-1"; + private static final String SAMPLE_FILE_PAYLOAD = "abc"; + + @Test + public void testConsumeDelete() throws Exception { + if (!canTest()) { + return; + } + + // prepare sample file to be consumed by FTP consumer + createSampleFile(SAMPLE_FILE_NAME_1); + + // Prepare expectations + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD); + + context.getRouteController().startRoute("foo"); + + // Check that expectations are satisfied + assertMockEndpointsSatisfied(); + + // File is deleted + Thread.sleep(250); + File deletedFile = new File(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_1); + assertFalse(deletedFile.exists(), "File should have been deleted: " + deletedFile); + } + + @Test + public void testConsumeMove() throws Exception { + if (!canTest()) { + return; + } + + // moved file after its processed + String movedFile = FTP_ROOT_DIR + "/.camel/" + SAMPLE_FILE_NAME_2; + + // prepare sample file to be consumed by FTP consumer + createSampleFile(SAMPLE_FILE_NAME_2); + + // Prepare expectations + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD); + // use mock to assert that the file will be moved there eventually + mock.expectedFileExists(movedFile); + + context.getRouteController().startRoute("bar"); + + // Check that expectations are satisfied + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("ftp://admin@localhost:" + getPort() + "?password=admin&delete=true").routeId("foo").noAutoStartup() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + disconnectAllSessions(); // disconnect all Sessions on FTP server + } + }).to("mock:result"); + from("ftp://admin@localhost:" + getPort() + "?password=admin&noop=false&move=.camel").routeId("bar").noAutoStartup() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + disconnectAllSessions(); // disconnect all Sessions on FTP server + } + }).to("mock:result"); + } + }; + } + + private void createSampleFile(String fileName) throws IOException { + File file = new File(FTP_ROOT_DIR + "/" + fileName); + FileUtils.write(file, SAMPLE_FILE_PAYLOAD, SAMPLE_FILE_CHARSET); + } + +} diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java index 0585a29..2e1d772 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java @@ -17,7 +17,10 @@ package org.apache.camel.component.file.remote; import java.io.File; +import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.Set; import org.apache.camel.Exchange; import org.apache.camel.util.ObjectHelper; @@ -26,6 +29,9 @@ import org.apache.ftpserver.FtpServer; import org.apache.ftpserver.FtpServerFactory; import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory; import org.apache.ftpserver.ftplet.UserManager; +import org.apache.ftpserver.impl.DefaultFtpServer; +import org.apache.ftpserver.impl.FtpIoSession; +import org.apache.ftpserver.listener.Listener; import org.apache.ftpserver.listener.ListenerFactory; import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor; import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory; @@ -138,5 +144,16 @@ public abstract class FtpServerTestSupport extends BaseServerTestSupport { public void sendFile(String url, Object body, String fileName) { template.sendBodyAndHeader(url, body, Exchange.FILE_NAME, simple(fileName)); } + + protected void disconnectAllSessions() throws IOException { + // stop all listeners + Map<String, Listener> listeners = ((DefaultFtpServer) ftpServer).getListeners(); + for (Listener listener : listeners.values()) { + Set<FtpIoSession> sessions = listener.getActiveSessions(); + for (FtpIoSession session : sessions) { + session.closeNow(); + } + } + } }