CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the file.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/adf655d5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/adf655d5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/adf655d5 Branch: refs/heads/camel-2.15.x Commit: adf655d54abd0fb67326f4b50d35d5ee2676af7e Parents: 8e9b488 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Mar 21 12:12:36 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Mar 21 12:13:25 2015 +0100 ---------------------------------------------------------------------- .../file/remote/RemoteFileConsumer.java | 39 ++++++++-- ...nrichConsumeWithDisconnectAndDeleteTest.java | 78 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index bb418e2..4a4ba2d 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -20,17 +20,19 @@ import java.io.IOException; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileConsumer; import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.support.SynchronizationAdapter; /** * Base class for remote file consumers. */ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { - protected boolean loggedIn; - protected boolean loggedInWarning; + protected transient boolean loggedIn; + protected transient boolean loggedInWarning; public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor, RemoteFileOperations<T> operations) { super(endpoint, processor, operations); @@ -87,10 +89,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { if (log.isTraceEnabled()) { log.trace("postPollCheck on " + getEndpoint().getConfiguration().remoteServerInformation()); } - if (getEndpoint().isDisconnect()) { - log.trace("postPollCheck disconnect from: {}", getEndpoint()); - disconnect(); - } } @Override @@ -98,6 +96,35 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { // mark the exchange to be processed synchronously as the ftp client is not thread safe // and we must execute the callbacks in the same thread as this consumer exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE); + + // defer disconnect til the UoW is complete - but only the last exchange from the batch should do that + boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, Boolean.class); + if (isLast && getEndpoint().isDisconnect()) { + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + log.trace("postPollCheck disconnect from: {}", getEndpoint()); + disconnect(); + } + + @Override + public boolean allowHandover() { + // do not allow handover as we must execute the callbacks in the same thread as this consumer + return false; + } + + @Override + public int getOrder() { + // we want to disconnect last + return Ordered.LOWEST; + } + + public String toString() { + return "Disconnect"; + } + }); + } + return super.processExchange(exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/adf655d5/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java new file mode 100644 index 0000000..f547b15 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; + +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.file.remote.sftp.SftpServerTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class SftpPollEnrichConsumeWithDisconnectAndDeleteTest extends SftpServerTestSupport { + + @Test + public void testSftpSimpleConsume() throws Exception { + if (!canTest()) { + return; + } + + String expected = "Hello World"; + + // create file using regular file + template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); + mock.expectedBodiesReceived(expected); + + ProducerTemplate triggerTemplate = context.createProducerTemplate(); + triggerTemplate.sendBody("vm:trigger", ""); + + assertMockEndpointsSatisfied(); + + long startFileDeletionCheckTime = System.currentTimeMillis(); + boolean fileExists = true; + while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) { // wait up to 3000ms for file to be deleted + File file = new File(FTP_ROOT_DIR + "/hello.txt"); + fileExists = file.exists(); + + if (fileExists) { + log.info("Will check that file has been deleted again in 200ms"); + Thread.sleep(200); + } + } + + assertFalse("The file should have been deleted", fileExists); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("vm:trigger") + .pollEnrich("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&delete=true") + .routeId("foo") + .to("mock:result"); + } + }; + } +} \ No newline at end of file