CAMEL-6671: FTP consumer. Added useList and ignoreFileNotFound options to allow to download a single file without using FTP LIST command which is needed in some use-cases, such as user has no permission to do FTP LIST.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c948c2b8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c948c2b8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c948c2b8 Branch: refs/heads/camel-2.12.x Commit: c948c2b8d237c1b191ede44491160b12478c027b Parents: b56b1c5 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Sep 5 12:44:16 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Sep 5 13:00:43 2013 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileConsumer.java | 64 +++++++++++++---- .../camel/impl/ScheduledPollConsumer.java | 1 + .../component/file/remote/FtpConsumer.java | 40 +++++++++-- .../file/remote/RemoteFileConfiguration.java | 35 ++++++++- .../file/remote/RemoteFileConsumer.java | 4 +- .../component/file/remote/SftpConsumer.java | 1 + .../file/remote/FromFtpUseListFalseTest.java | 75 ++++++++++++++++++++ .../FtpConsumerTemplateUseListFalseTest.java | 74 +++++++++++++++++++ 8 files changed, 271 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 0f8be8e..c55755e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -171,6 +171,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum public int processBatch(Queue<Object> exchanges) { int total = exchanges.size(); + int answer = total; // limit if needed if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { @@ -191,19 +192,25 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum pendingExchanges = total - index - 1; // process the current exchange + boolean started; if (customProcessor != null) { // use a custom processor - customProcessExchange(exchange, customProcessor); + started = customProcessExchange(exchange, customProcessor); } else { // process the exchange regular - processExchange(exchange); + started = processExchange(exchange); + } + + // if we did not start process the file then decremember the counter + if (!started) { + answer--; } } // drain any in progress files as we are done with this batch removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); - return total; + return answer; } protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { @@ -277,11 +284,28 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum } /** + * Whether to ignore if the file cannot be retrieved. + * <p/> + * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. + * <p/> + * This method allows to suppress this and just ignore that. + * + * @param name the file name + * @param exchange the exchange + * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. + */ + protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) { + return false; + } + + /** * Processes the exchange * * @param exchange the exchange + * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started + * to be processed, for some reason (not found, or aborted etc) */ - protected void processExchange(final Exchange exchange) { + protected boolean processExchange(final Exchange exchange) { GenericFile<T> file = getExchangeFileProperty(exchange); log.trace("Processing file: {}", file); @@ -303,7 +327,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // begin returned false, so remove file from the in progress list as its no longer in progress endpoint.getInProgressRepository().remove(absoluteFileName); } - return; + return false; } } catch (Exception e) { // remove file from the in progress list due to failure @@ -311,7 +335,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(); handleException(msg, e); - return; + return false; } // must use file from exchange as it can be updated due the @@ -328,10 +352,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // retrieve the file and check it was a success boolean retrieved = operations.retrieveFile(name, exchange); if (!retrieved) { - // throw exception to handle the problem with retrieving the file - // then if the method return false or throws an exception is handled the same in here - // as in both cases an exception is being thrown - throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint); + if (ignoreCannotRetrieveFile(name, exchange)) { + log.trace("Cannot retrieve file {} maybe it does not exists. Ignorning.", name); + // remove file from the in progress list as we could not retrieve it, but should ignore + endpoint.getInProgressRepository().remove(absoluteFileName); + return false; + } else { + // throw exception to handle the problem with retrieving the file + // then if the method return false or throws an exception is handled the same in here + // as in both cases an exception is being thrown + throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint); + } } log.trace("Retrieved file: {} from: {}", name, endpoint); @@ -368,6 +399,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum String msg = "Error processing file " + file + " due to " + e.getMessage(); handleException(msg, e); } + + return true; } /** @@ -385,7 +418,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum * @param exchange the exchange * @param processor the custom processor */ - protected void customProcessExchange(final Exchange exchange, final Processor processor) { + protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { GenericFile<T> file = getExchangeFileProperty(exchange); log.trace("Custom processing file: {}", file); @@ -407,6 +440,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // as the name can be different when using preMove option endpoint.getInProgressRepository().remove(absoluteFileName); } + + return true; } /** @@ -508,7 +543,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // use file expression for a simple dynamic file filter if (endpoint.getFileName() != null) { - evaluateFileExpression(); + fileExpressionResult = evaluateFileExpression(); if (fileExpressionResult != null) { if (!name.equals(fileExpressionResult)) { return false; @@ -557,12 +592,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return !endpoint.getInProgressRepository().add(key); } - private void evaluateFileExpression() { - if (fileExpressionResult == null) { + protected String evaluateFileExpression() { + if (fileExpressionResult == null && endpoint.getFileName() != null) { // create a dummy exchange as Exchange is needed for expression evaluation Exchange dummy = endpoint.createExchange(); fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); } + return fileExpressionResult; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 6b34ac7..98b9e42 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -185,6 +185,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R if (begin) { retryCounter++; polledMessages = poll(); + LOG.trace("Polled {} messages", polledMessages); if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { // send an "empty" exchange http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java index 7b042ea..5d7dca5 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java @@ -16,8 +16,10 @@ */ package org.apache.camel.component.file.remote; +import java.util.ArrayList; import java.util.List; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.util.FileUtil; @@ -83,11 +85,24 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { } log.trace("Polling directory: {}", dir); - List<FTPFile> files; - if (isStepwise()) { - files = operations.listFiles(); + List<FTPFile> files = null; + if (isUseList()) { + if (isStepwise()) { + files = operations.listFiles(); + } else { + files = operations.listFiles(dir); + } } else { - files = operations.listFiles(dir); + // we cannot use the LIST command(s) so we can only poll a named file + // so created a pseudo file with that name + FTPFile file = new FTPFile(); + file.setType(FTPFile.FILE_TYPE); + fileExpressionResult = evaluateFileExpression(); + if (fileExpressionResult != null) { + file.setName(fileExpressionResult); + files = new ArrayList<FTPFile>(1); + files.add(file); + } } if (files == null || files.isEmpty()) { @@ -149,6 +164,18 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { return false; } + @Override + protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange) { + if (getEndpoint().getConfiguration().isIgnoreFileNotFound()) { + // error code 550 is file not found + int code = exchange.getIn().getHeader(FtpConstants.FTP_REPLY_CODE, 0, int.class); + if (code == 550) { + return true; + } + } + return super.ignoreCannotRetrieveFile(name, exchange); + } + private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) { RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>(); @@ -193,6 +220,11 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { return config.isStepwise(); } + private boolean isUseList() { + RemoteFileConfiguration config = (RemoteFileConfiguration) endpoint.getConfiguration(); + return config.isUseList(); + } + @Override public String toString() { return "FtpConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java index e5325ba..bbe0a18 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java @@ -33,7 +33,7 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration { * Windows = Path separator \ is used * Auto = Use existing path separator in file name */ - public enum PathSeparator { UNIX, Windows, Auto }; + public enum PathSeparator { UNIX, Windows, Auto } private String protocol; private String username; @@ -50,6 +50,8 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration { private boolean stepwise = true; private PathSeparator separator = PathSeparator.Auto; private boolean streamDownload; + private boolean useList = true; + private boolean ignoreFileNotFound; public RemoteFileConfiguration() { } @@ -270,13 +272,40 @@ public abstract class RemoteFileConfiguration extends GenericFileConfiguration { * Sets the download method to use when not using a local working directory. If set to true, * the remote files are streamed to the route as they are read. When set to false, the remote files * are loaded into memory before being sent into the route. - * - * @param streamDownload */ public void setStreamDownload(boolean streamDownload) { this.streamDownload = streamDownload; } + public boolean isUseList() { + return useList; + } + + /** + * Whether to allow using LIST command when downloading a file. + * <p/> + * Default is <tt>true</tt>. In some use cases you may want to download + * a specific file and are not allowed to use the LIST command, and therefore + * you can set this option to <tt>false</tt>. + */ + public void setUseList(boolean useList) { + this.useList = useList; + } + + public boolean isIgnoreFileNotFound() { + return ignoreFileNotFound; + } + + /** + * Whether to ignore when trying to download a file which does not exist. + * <p/> + * By default when a file does not exists, then an exception is thrown. + * Setting this option to <tt>true</tt> allows to ignore that instead. + */ + public void setIgnoreFileNotFound(boolean ignoreFileNotFound) { + this.ignoreFileNotFound = ignoreFileNotFound; + } + /** * Normalizes the given path according to the configured path separator. * http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 39e23ce..de242da 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -92,11 +92,11 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { } @Override - protected void processExchange(Exchange exchange) { + protected boolean processExchange(Exchange exchange) { // mark the exchange to be processed synchronously as the ftp client is not thread safe // and we must execute the callbacks in the same thread as this consumer exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE); - super.processExchange(exchange); + return super.processExchange(exchange); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java index 1dbfd9d..765aac4 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.file.remote; import java.util.List; import com.jcraft.jsch.ChannelSftp; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.camel.util.FileUtil; http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java new file mode 100644 index 0000000..afd2763 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpUseListFalseTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test to poll a fixed file from the FTP server without using the list command. + */ +public class FromFtpUseListFalseTest extends FtpServerTestSupport { + + private String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin" + + "&stepwise=false&useList=false&ignoreFileNotFound=true&fileName=report.txt&delete=true"; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + prepareFtpServer(); + } + + @Test + public void testUseListFalse() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World from FTPServer"); + + // just allow to poll a few more times, but we should only get the file once + Thread.sleep(1000); + + mock.assertIsSatisfied(); + } + + private void prepareFtpServer() throws Exception { + // prepares the FTP Server by creating a file on the server that we want to unit + // test that we can pool and store as a local file + Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false"); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello World from FTPServer"); + exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt"); + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + producer.stop(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from(getFtpUrl()).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c948c2b8/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java new file mode 100644 index 0000000..6554e62 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerTemplateUseListFalseTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test to poll a fixed file from the FTP server without using the list command. + */ +public class FtpConsumerTemplateUseListFalseTest extends FtpServerTestSupport { + + private String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/nolist/?password=admin" + + "&stepwise=false&useList=false&ignoreFileNotFound=true&delete=true"; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + prepareFtpServer(); + } + + @Test + public void testUseListFalse() throws Exception { + String data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 5000, String.class); + assertEquals("Hello World from FTPServer", data); + + // try a 2nd time and the file is deleted + data = consumer.receiveBody(getFtpUrl() + "&fileName=report.txt", 1000, String.class); + assertNull("The file should no longer exist", data); + + // and try a non existing file name + data = consumer.receiveBody(getFtpUrl() + "&fileName=report2.txt", 1000, String.class); + assertNull("The file should no longer exist", data); + } + + private void prepareFtpServer() throws Exception { + // prepares the FTP Server by creating a file on the server that we want to unit + // test that we can pool and store as a local file + Endpoint endpoint = context.getEndpoint("ftp://admin@localhost:" + getPort() + "/nolist/?password=admin&binary=false"); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello World from FTPServer"); + exchange.getIn().setHeader(Exchange.FILE_NAME, "report.txt"); + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + producer.stop(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + +}