Author: davsclaus Date: Tue Jul 6 08:20:12 2010 New Revision: 960839 URL: http://svn.apache.org/viewvc?rev=960839&view=rev Log: CAMEL-2899: maxMessagesPerPoll is applied eagerly for the file/ftp components to avoid excessive memory consumption when polling directories with 100.000 or more files.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java camel/trunk/components/camel-spring-security/pom.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=960839&r1=960838&r2=960839&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Tue Jul 6 08:20:12 2010 @@ -35,7 +35,7 @@ public class FileConsumer extends Generi this.endpointPath = endpoint.getConfiguration().getDirectory(); } - protected void pollDirectory(String fileName, List<GenericFile<File>> fileList) { + protected boolean pollDirectory(String fileName, List<GenericFile<File>> fileList) { if (log.isTraceEnabled()) { log.trace("pollDirectory from fileName: " + fileName); } @@ -45,7 +45,7 @@ public class FileConsumer extends Generi if (log.isDebugEnabled()) { log.debug("Cannot poll as directory does not exists or its not a directory: " + directory); } - return; + return true; } if (log.isTraceEnabled()) { @@ -57,7 +57,7 @@ public class FileConsumer extends Generi if (log.isTraceEnabled()) { log.trace("No files found in directory: " + directory.getPath()); } - return; + return true; } else { // we found some files if (log.isTraceEnabled()) { @@ -66,6 +66,11 @@ public class FileConsumer extends Generi } for (File file : files) { + // check if we can continue polling in files + if (!canPollMoreFiles(fileList)) { + return false; + } + // trace log as Windows/Unix can have different views what the file is? if (log.isTraceEnabled()) { log.trace("Found file: " + file + " [isAbsolute: " + file.isAbsolute() + ", isDirectory: " @@ -79,7 +84,10 @@ public class FileConsumer extends Generi if (endpoint.isRecursive() && isValidFile(gf, true)) { // recursive scan and add the sub files and folders String subDirectory = fileName + File.separator + file.getName(); - pollDirectory(subDirectory, fileList); + boolean canPollMore = pollDirectory(subDirectory, fileList); + if (!canPollMore) { + return false; + } } } else { // Windows can report false to a file on a share so regard it always as a file (if its not a directory) @@ -98,6 +106,8 @@ public class FileConsumer extends Generi } } } + + return true; } /** Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=960839&r1=960838&r2=960839&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jul 6 08:20:12 2010 @@ -73,9 +73,15 @@ public abstract class GenericFileConsume // gather list of files to process List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); - String name = endpoint.getConfiguration().getDirectory(); - pollDirectory(name, files); + boolean limitHit = !pollDirectory(name, files); + + // log if we hit the limit + if (limitHit) { + if (log.isDebugEnabled()) { + log.debug("Limiting maximum messages to poll at " + maxMessagesPerPoll + " files as there was more messages in this poll."); + } + } // sort files using file comparator if provided if (endpoint.getSorter() != null) { @@ -180,6 +186,20 @@ public abstract class GenericFileConsume } /** + * Whether or not we can continue polling for more files + * + * @param fileList the current list of gathered files + * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit + */ + public boolean canPollMoreFiles(List fileList) { + if (maxMessagesPerPoll > 0 && fileList.size() >= maxMessagesPerPoll) { + return false; + } else { + return true; + } + } + + /** * Override if required. Perform some checks (and perhaps actions) before we * poll. * @@ -202,8 +222,9 @@ public abstract class GenericFileConsume * * @param fileName current directory or file * @param fileList current list of files gathered + * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit */ - protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList); + protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList); /** * Sets the operations to be used. Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=960839&r1=960838&r2=960839&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Tue Jul 6 08:20:12 2010 @@ -36,9 +36,12 @@ public class FtpConsumer extends RemoteF this.endpointPath = endpoint.getConfiguration().getDirectory(); } - protected void pollDirectory(String fileName, List<GenericFile<FTPFile>> fileList) { + protected boolean pollDirectory(String fileName, List<GenericFile<FTPFile>> fileList) { + if (log.isTraceEnabled()) { + log.trace("pollDirectory from fileName: " + fileName); + } if (fileName == null) { - return; + return true; } // remove trailing / @@ -48,13 +51,35 @@ public class FtpConsumer extends RemoteF log.trace("Polling directory: " + fileName); } List<FTPFile> files = operations.listFiles(fileName); + if (files == null || files.isEmpty()) { + // no files in this directory to poll + if (log.isTraceEnabled()) { + log.trace("No files found in directory: " + fileName); + } + return true; + } else { + // we found some files + if (log.isTraceEnabled()) { + log.trace("Found " + files.size() + " in directory: " + fileName); + } + } + for (FTPFile file : files) { + + // check if we can continue polling in files + if (!canPollMoreFiles(fileList)) { + return false; + } + if (file.isDirectory()) { RemoteFile<FTPFile> remote = asRemoteFile(fileName, file); if (endpoint.isRecursive() && isValidFile(remote, true)) { // recursive scan and add the sub files and folders - String directory = fileName + "/" + file.getName(); - pollDirectory(directory, fileList); + String subDirectory = fileName + "/" + file.getName(); + boolean canPollMore = pollDirectory(subDirectory, fileList); + if (!canPollMore) { + return false; + } } } else if (file.isFile()) { RemoteFile<FTPFile> remote = asRemoteFile(fileName, file); @@ -72,6 +97,8 @@ public class FtpConsumer extends RemoteF log.debug("Ignoring unsupported remote file type: " + file); } } + + return true; } private RemoteFile<FTPFile> asRemoteFile(String directory, FTPFile file) { Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=960839&r1=960838&r2=960839&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Tue Jul 6 08:20:12 2010 @@ -36,9 +36,13 @@ public class SftpConsumer extends Remote this.endpointPath = endpoint.getConfiguration().getDirectory(); } - protected void pollDirectory(String fileName, List<GenericFile<ChannelSftp.LsEntry>> fileList) { + protected boolean pollDirectory(String fileName, List<GenericFile<ChannelSftp.LsEntry>> fileList) { + if (log.isTraceEnabled()) { + log.trace("pollDirectory from fileName: " + fileName); + } + if (fileName == null) { - return; + return true; } // remove trailing / @@ -48,13 +52,35 @@ public class SftpConsumer extends Remote log.trace("Polling directory: " + fileName); } List<ChannelSftp.LsEntry> files = operations.listFiles(fileName); + if (files == null || files.isEmpty()) { + // no files in this directory to poll + if (log.isTraceEnabled()) { + log.trace("No files found in directory: " + fileName); + } + return true; + } else { + // we found some files + if (log.isTraceEnabled()) { + log.trace("Found " + files.size() + " in directory: " + fileName); + } + } + for (ChannelSftp.LsEntry file : files) { + + // check if we can continue polling in files + if (!canPollMoreFiles(fileList)) { + return false; + } + if (file.getAttrs().isDir()) { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file); if (endpoint.isRecursive() && isValidFile(remote, true)) { // recursive scan and add the sub files and folders - String directory = fileName + "/" + file.getFilename(); - pollDirectory(directory, fileList); + String subDirectory = fileName + "/" + file.getFilename(); + boolean canPollMore = pollDirectory(subDirectory, fileList); + if (!canPollMore) { + return false; + } } // we cannot use file.getAttrs().isLink on Windows, so we dont invoke the method // just assuming its a file we should poll @@ -72,6 +98,8 @@ public class SftpConsumer extends Remote } } } + + return true; } private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String directory, ChannelSftp.LsEntry file) { Modified: camel/trunk/components/camel-spring-security/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-security/pom.xml?rev=960839&r1=960838&r2=960839&view=diff ============================================================================== --- camel/trunk/components/camel-spring-security/pom.xml (original) +++ camel/trunk/components/camel-spring-security/pom.xml Tue Jul 6 08:20:12 2010 @@ -9,7 +9,7 @@ <artifactId>camel-spring-security</artifactId> <packaging>bundle</packaging> - <name>Camel :: Spring :: Security </name> + <name>Camel :: Spring Security </name> <description>Camel Spring Security support</description> <properties>