Author: dvaleri Date: Wed Feb 20 14:47:06 2013 New Revision: 1448198 URL: http://svn.apache.org/r1448198 Log: [CAMEL-6090] Added streaming option for retreival of remote files.
Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java (with props) camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java Wed Feb 20 14:47:06 2013 @@ -155,6 +155,11 @@ public class FileOperations implements G // noop as we use type converters to read the body content for java.io.File return true; } + + @Override + public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException { + // noop as we used type converters to read the body content for java.io.File + } public boolean storeFile(String fileName, Exchange exchange) throws GenericFileOperationFailedException { ObjectHelper.notNull(endpoint, "endpoint"); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java Wed Feb 20 14:47:06 2013 @@ -77,6 +77,14 @@ public interface GenericFileOperations<T * @throws GenericFileOperationFailedException can be thrown */ boolean retrieveFile(String name, Exchange exchange) throws GenericFileOperationFailedException; + + /** + * Releases the resources consumed by a retrieved file + * + * @param exchange exchange with the content of the file + * @throws GenericFileOperationFailedException can be thrown + */ + void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException; /** * Stores the content as a new remote file (upload) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Wed Feb 20 14:47:06 2013 @@ -62,6 +62,7 @@ public abstract class GenericFileProcess } deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); } public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { @@ -70,6 +71,7 @@ public abstract class GenericFileProcess } deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); } public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { @@ -78,6 +80,7 @@ public abstract class GenericFileProcess } deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); } public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() { Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java Wed Feb 20 14:47:06 2013 @@ -298,17 +298,29 @@ public class FtpOperations implements Re return retrieveFileToStreamInBody(name, exchange); } } + + @Override + public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException { + InputStream is = exchange.getIn().getHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM, InputStream.class); + + if (is != null) { + try { + is.close(); + client.completePendingCommand(); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } + } @SuppressWarnings("unchecked") private boolean retrieveFileToStreamInBody(String name, Exchange exchange) throws GenericFileOperationFailedException { OutputStream os = null; boolean result; try { - os = new ByteArrayOutputStream(); GenericFile<FTPFile> target = (GenericFile<FTPFile>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); ObjectHelper.notNull(target, "Exchange should have the " + FileComponent.FILE_EXCHANGE_FILE + " set"); - target.setBody(os); - + String remoteName = name; String currentDir = null; if (endpoint.getConfiguration().isStepwise()) { @@ -326,7 +338,17 @@ public class FtpOperations implements Re } log.trace("Client retrieveFile: {}", remoteName); - result = client.retrieveFile(remoteName, os); + + if (endpoint.getConfiguration().isStreamDownload()) { + InputStream is = client.retrieveFileStream(remoteName); + target.setBody(is); + exchange.getIn().setHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM, is); + result = true; + } else { + os = new ByteArrayOutputStream(); + target.setBody(os); + result = client.retrieveFile(remoteName, os); + } // change back to current directory if (endpoint.getConfiguration().isStepwise()) { Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java Wed Feb 20 14:47:06 2013 @@ -26,6 +26,8 @@ import org.apache.camel.component.file.G * @param <T> the type of file that these remote endpoints provide */ public abstract class RemoteFileComponent<T> extends GenericFileComponent<T> { + + public static final String REMOTE_FILE_INPUT_STREAM = "CamelRemoteFileInputStream"; public RemoteFileComponent() { } Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java Wed Feb 20 14:47:06 2013 @@ -49,6 +49,7 @@ public abstract class RemoteFileConfigur private String siteCommand; private boolean stepwise = true; private PathSeparator separator = PathSeparator.Auto; + private boolean streamDownload; public RemoteFileConfiguration() { } @@ -260,6 +261,21 @@ public abstract class RemoteFileConfigur public void setSeparator(PathSeparator separator) { this.separator = separator; } + + public boolean isStreamDownload() { + return streamDownload; + } + + /** + * Sets the download method to use when not using a local working directory. If set to true, + * the remote files are streamed to the route as they are read. When set to false, the remote files + * are loaded into memory before being sent into the route. + * + * @param streamDownload + */ + public void setStreamDownload(boolean streamDownload) { + this.streamDownload = streamDownload; + } /** * Normalizes the given path according to the configured path separator. Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java Wed Feb 20 14:47:06 2013 @@ -489,18 +489,29 @@ public class SftpOperations implements R return retrieveFileToStreamInBody(name, exchange); } } + + @Override + public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException { + InputStream is = exchange.getIn().getHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM, InputStream.class); + + if (is != null) { + try { + is.close(); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } + } @SuppressWarnings("unchecked") private boolean retrieveFileToStreamInBody(String name, Exchange exchange) throws GenericFileOperationFailedException { OutputStream os = null; String currentDir = null; try { - os = new ByteArrayOutputStream(); GenericFile<ChannelSftp.LsEntry> target = (GenericFile<ChannelSftp.LsEntry>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); ObjectHelper.notNull(target, "Exchange should have the " + FileComponent.FILE_EXCHANGE_FILE + " set"); - target.setBody(os); - + String remoteName = name; if (endpoint.getConfiguration().isStepwise()) { // remember current directory @@ -518,7 +529,15 @@ public class SftpOperations implements R // use input stream which works with Apache SSHD used for testing InputStream is = channel.get(remoteName); - IOHelper.copyAndCloseInput(is, os); + + if (endpoint.getConfiguration().isStreamDownload()) { + target.setBody(is); + exchange.getIn().setHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM, is); + } else { + os = new ByteArrayOutputStream(); + target.setBody(os); + IOHelper.copyAndCloseInput(is, os); + } return true; } catch (IOException e) { Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java Wed Feb 20 14:47:06 2013 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.file.remote.sftp; +package org.apache.camel.component.file.remote; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -24,10 +24,10 @@ import org.junit.Test; /** * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest extends FtpServerTestSupport { @Test - public void testSftpSimpleConsume() throws Exception { + public void testFtpSimpleConsumeAbsolute() throws Exception { if (!canTest()) { return; } @@ -35,7 +35,10 @@ public class SftpSimpleConsumeTest exten String expected = "Hello World"; // create file using regular file - template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + + // FTP Server does not support absolute path, so lets simulate it + String path = FTP_ROOT_DIR + "/tmp/mytemp"; + template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); @@ -51,7 +54,9 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + // notice we use an absolute starting path: /tmp/mytemp + // - we must remember to use // slash because of the url separator + from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true") .routeId("foo").noAutoStartup() .to("mock:result"); } Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java?rev=1448198&view=auto ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java (added) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java Wed Feb 20 14:47:06 2013 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; +import java.io.InputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +/** + * @version + */ +public class FtpSimpleConsumeStreamingPartialReadTest extends FtpServerTestSupport { + + @Test + public void testFtpSimpleConsumeAbsolute() throws Exception { + if (!canTest()) { + return; + } + + String expected = "Hello World"; + + // create file using regular file + + // FTP Server does not support absolute path, so lets simulate it + String path = FTP_ROOT_DIR + "/tmp/mytemp"; + template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); + + context.startRoute("foo"); + + assertMockEndpointsSatisfied(); + + // Wait a little bit for the move to finish. + Thread.sleep(2000); + + File resultFile = new File(path + File.separator + "failed", "hello.txt"); + assertTrue(resultFile.exists()); + assertFalse(resultFile.isDirectory()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // notice we use an absolute starting path: /tmp/mytemp + // - we must remember to use // slash because of the url separator + from("ftp://localhost:" + getPort() + + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true" + + "&move=done&moveFailed=failed") + .routeId("foo").noAutoStartup() + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().getBody(InputStream.class).read(); + + } + }) + .to("mock:result") + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + throw new Exception("INTENTIONAL ERROR"); + } + }); + } + }; + } +} Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java Wed Feb 20 14:47:06 2013 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.file.remote.sftp; +package org.apache.camel.component.file.remote; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -24,10 +24,10 @@ import org.junit.Test; /** * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class FtpSimpleConsumeStreamingTest extends FtpServerTestSupport { @Test - public void testSftpSimpleConsume() throws Exception { + public void testFtpSimpleConsumeAbsolute() throws Exception { if (!canTest()) { return; } @@ -35,7 +35,10 @@ public class SftpSimpleConsumeTest exten String expected = "Hello World"; // create file using regular file - template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + + // FTP Server does not support absolute path, so lets simulate it + String path = FTP_ROOT_DIR + "/tmp/mytemp"; + template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); @@ -51,7 +54,9 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + // notice we use an absolute starting path: /tmp/mytemp + // - we must remember to use // slash because of the url separator + from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true") .routeId("foo").noAutoStartup() .to("mock:result"); } Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java Wed Feb 20 14:47:06 2013 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.file.remote.sftp; +package org.apache.camel.component.file.remote; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -24,22 +24,27 @@ import org.junit.Test; /** * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class FtpSimpleConsumeStreamingWithMultipleFilesTest extends FtpServerTestSupport { @Test - public void testSftpSimpleConsume() throws Exception { + public void testFtpSimpleConsumeAbsolute() throws Exception { if (!canTest()) { return; } String expected = "Hello World"; + String expected2 = "Goodbye World"; // create file using regular file - template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + + // FTP Server does not support absolute path, so lets simulate it + String path = FTP_ROOT_DIR + "/tmp/mytemp"; + template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt"); + template.sendBodyAndHeader("file:" + path, expected2, Exchange.FILE_NAME, "goodbye.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(1); - mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); + mock.expectedMessageCount(2); + mock.expectedBodiesReceivedInAnyOrder(expected, expected2); context.startRoute("foo"); @@ -51,7 +56,9 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + // notice we use an absolute starting path: /tmp/mytemp + // - we must remember to use // slash because of the url separator + from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true") .routeId("foo").noAutoStartup() .to("mock:result"); } Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java Wed Feb 20 14:47:06 2013 @@ -16,15 +16,21 @@ */ package org.apache.camel.component.file.remote.sftp; +import java.io.File; +import java.io.InputStream; + import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; /** + * Tests that a file move can occur on the server even if the remote stream was only partially read. + * * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class SftpSimpleConsumeStreamingPartialReadTest extends SftpServerTestSupport { @Test public void testSftpSimpleConsume() throws Exception { @@ -40,10 +46,17 @@ public class SftpSimpleConsumeTest exten MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); - + context.startRoute("foo"); assertMockEndpointsSatisfied(); + + // Wait a little bit for the move to finish. + Thread.sleep(2000); + + File resultFile = new File(FTP_ROOT_DIR + File.separator + "failed", "hello.txt"); + assertTrue(resultFile.exists()); + assertFalse(resultFile.isDirectory()); } @Override @@ -51,9 +64,26 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true" + + "&move=done&moveFailed=failed") .routeId("foo").noAutoStartup() - .to("mock:result"); + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().getBody(InputStream.class).read(); + + } + }) + .to("mock:result") + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + throw new Exception("INTENTIONAL ERROR"); + } + }); } }; } Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java Wed Feb 20 14:47:06 2013 @@ -24,7 +24,7 @@ import org.junit.Test; /** * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class SftpSimpleConsumeStreamingTest extends SftpServerTestSupport { @Test public void testSftpSimpleConsume() throws Exception { @@ -40,7 +40,8 @@ public class SftpSimpleConsumeTest exten MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); - + mock.expectedBodiesReceived(expected); + context.startRoute("foo"); assertMockEndpointsSatisfied(); @@ -51,7 +52,7 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true") .routeId("foo").noAutoStartup() .to("mock:result"); } Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java (from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java Wed Feb 20 14:47:06 2013 @@ -24,7 +24,7 @@ import org.junit.Test; /** * @version */ -public class SftpSimpleConsumeTest extends SftpServerTestSupport { +public class SftpSimpleConsumeStreamingWithMultipleFilesTest extends SftpServerTestSupport { @Test public void testSftpSimpleConsume() throws Exception { @@ -33,14 +33,16 @@ public class SftpSimpleConsumeTest exten } String expected = "Hello World"; + String expected2 = "Goodbye World"; // create file using regular file template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected2, Exchange.FILE_NAME, "goodbye.txt"); MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(1); - mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); - + mock.expectedMessageCount(2); + mock.expectedBodiesReceivedInAnyOrder(expected, expected2); + context.startRoute("foo"); assertMockEndpointsSatisfied(); @@ -51,7 +53,7 @@ public class SftpSimpleConsumeTest exten return new RouteBuilder() { @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true") + from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true") .routeId("foo").noAutoStartup() .to("mock:result"); } Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java?rev=1448198&r1=1448197&r2=1448198&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java Wed Feb 20 14:47:06 2013 @@ -40,7 +40,8 @@ public class SftpSimpleConsumeTest exten MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); - + mock.expectedBodiesReceived(expected); + context.startRoute("foo"); assertMockEndpointsSatisfied();