Author: davsclaus Date: Tue Jun 28 12:16:57 2011 New Revision: 1140555 URL: http://svn.apache.org/viewvc?rev=1140555&view=rev Log: CAMEL-3655: Reverted back of previous work on this as its not needed anymore.
Removed: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jun 28 12:16:57 2011 @@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory; */ public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { protected final transient Logger log = LoggerFactory.getLogger(getClass()); - protected final ProcessFile processFile; protected GenericFileEndpoint<T> endpoint; protected GenericFileOperations<T> operations; + protected boolean loggedIn; protected String fileExpressionResult; protected int maxMessagesPerPoll; protected volatile ShutdownRunningTask shutdownRunningTask; @@ -54,13 +54,6 @@ public abstract class GenericFileConsume super(endpoint, processor); this.endpoint = endpoint; this.operations = operations; - this.processFile = new ProcessFile(this); - } - - @Override - @SuppressWarnings("unchecked") - public GenericFileEndpoint<T> getEndpoint() { - return (GenericFileEndpoint<T>) super.getEndpoint(); } /** @@ -81,7 +74,7 @@ public abstract class GenericFileConsume // gather list of files to process List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); - String name = getEndpoint().getConfiguration().getDirectory(); + String name = endpoint.getConfiguration().getDirectory(); // time how long time it takes to poll StopWatch stop = new StopWatch(); @@ -97,21 +90,21 @@ public abstract class GenericFileConsume } // sort files using file comparator if provided - if (getEndpoint().getSorter() != null) { - Collections.sort(files, getEndpoint().getSorter()); + if (endpoint.getSorter() != null) { + Collections.sort(files, endpoint.getSorter()); } // sort using build in sorters so we can use expressions LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); for (GenericFile<T> file : files) { - Exchange exchange = getEndpoint().createExchange(file); - getEndpoint().configureExchange(exchange); - getEndpoint().configureMessage(file, exchange.getIn()); + Exchange exchange = endpoint.createExchange(file); + endpoint.configureExchange(exchange); + endpoint.configureMessage(file, exchange.getIn()); exchanges.add(exchange); } // sort files using exchange comparator if provided - if (getEndpoint().getSortBy() != null) { - Collections.sort(exchanges, getEndpoint().getSortBy()); + if (endpoint.getSortBy() != null) { + Collections.sort(exchanges, endpoint.getSortBy()); } // consume files one by one @@ -163,7 +156,7 @@ public abstract class GenericFileConsume Exchange exchange = (Exchange) exchanges.poll(); GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); String key = file.getAbsoluteFilePath(); - getEndpoint().getInProgressRepository().remove(key); + endpoint.getInProgressRepository().remove(key); } return total; @@ -258,22 +251,83 @@ public abstract class GenericFileConsume } /** - * Gets the operations to be used - * - * @return the operations - */ - public GenericFileOperations<T> getOperations() { - return operations; - } - - /** * Processes the exchange * * @param exchange the exchange */ protected void processExchange(final Exchange exchange) { - // let the process do the work - processFile.processExchange(exchange); + GenericFile<T> file = getExchangeFileProperty(exchange); + log.trace("Processing file: {}", file); + + // must extract the absolute name before the begin strategy as the file could potentially be pre moved + // and then the file name would be changed + String absoluteFileName = file.getAbsoluteFilePath(); + + // check if we can begin processing the file + try { + final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); + + boolean begin = processStrategy.begin(operations, endpoint, exchange, file); + if (!begin) { + log.debug(endpoint + " cannot begin processing file: {}", file); + // begin returned false, so remove file from the in progress list as its no longer in progress + endpoint.getInProgressRepository().remove(absoluteFileName); + return; + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e); + } + endpoint.getInProgressRepository().remove(absoluteFileName); + return; + } + + // must use file from exchange as it can be updated due the + // preMoveNamePrefix/preMoveNamePostfix options + final GenericFile<T> target = getExchangeFileProperty(exchange); + // must use full name when downloading so we have the correct path + final String name = target.getAbsoluteFilePath(); + try { + // retrieve the file using the stream + log.trace("Retrieving file: {} from: {}", name, endpoint); + + // retrieve the file and check it was a success + boolean retrieved = operations.retrieveFile(name, exchange); + if (!retrieved) { + // throw exception to handle the problem with retrieving the file + // then if the method return false or throws an exception is handled the same in here + // as in both cases an exception is being thrown + throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint); + } + + log.trace("Retrieved file: {} from: {}", name, endpoint); + + // register on completion callback that does the completion strategies + // (for instance to move the file after we have processed it) + exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); + + log.debug("About to process file: {} using exchange: {}", target, exchange); + + // process the exchange using the async consumer to support async routing engine + // which can be supported by this file consumer as all the done work is + // provided in the GenericFileOnCompletion + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + // noop + if (log.isTraceEnabled()) { + log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); + } + } + }); + + } catch (Exception e) { + // remove file from the in progress list due to failure + // (cannot be in finally block due to GenericFileOnCompletion will remove it + // from in progress when it takes over and processes the file, which may happen + // by another thread at a later time. So its only safe to remove it if there was an exception) + endpoint.getInProgressRepository().remove(absoluteFileName); + handleException(e); + } } /** @@ -287,7 +341,7 @@ public abstract class GenericFileConsume if (!isMatched(file, isDirectory)) { log.trace("File did not match. Will skip this file: {}", file); return false; - } else if (getEndpoint().isIdempotent() && getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath())) { + } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); return false; } @@ -328,26 +382,26 @@ public abstract class GenericFileConsume return true; } - if (getEndpoint().getFilter() != null) { - if (!getEndpoint().getFilter().accept(file)) { + if (endpoint.getFilter() != null) { + if (!endpoint.getFilter().accept(file)) { return false; } } - if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) { - if (name.matches(getEndpoint().getExclude())) { + if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { + if (name.matches(endpoint.getExclude())) { return false; } } - if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) { - if (!name.matches(getEndpoint().getInclude())) { + if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { + if (!name.matches(endpoint.getInclude())) { return false; } } // use file expression for a simple dynamic file filter - if (getEndpoint().getFileName() != null) { + if (endpoint.getFileName() != null) { evaluateFileExpression(); if (fileExpressionResult != null) { if (!name.equals(fileExpressionResult)) { @@ -357,19 +411,19 @@ public abstract class GenericFileConsume } // if done file name is enabled, then the file is only valid if a done file exists - if (getEndpoint().getDoneFileName() != null) { + if (endpoint.getDoneFileName() != null) { // done file must be in same path as the file - String doneFileName = getEndpoint().createDoneFileName(file.getAbsoluteFilePath()); - ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint()); + String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); + ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); // is it a done file name? - if (getEndpoint().isDoneFile(file.getFileNameOnly())) { + if (endpoint.isDoneFile(file.getFileNameOnly())) { log.trace("Skipping done file: {}", file); return false; } // the file is only valid if the done file exist - if (!getOperations().existsFile(doneFileName)) { + if (!operations.existsFile(doneFileName)) { log.trace("Done file: {} does not exist", doneFileName); return false; } @@ -386,13 +440,13 @@ public abstract class GenericFileConsume */ protected boolean isInProgress(GenericFile<T> file) { String key = file.getAbsoluteFilePath(); - return !getEndpoint().getInProgressRepository().add(key); + return !endpoint.getInProgressRepository().add(key); } private void evaluateFileExpression() { if (fileExpressionResult == null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = new DefaultExchange(getEndpoint().getCamelContext()); + Exchange dummy = new DefaultExchange(endpoint.getCamelContext()); fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); } } @@ -407,35 +461,6 @@ public abstract class GenericFileConsume super.doStart(); // prepare on startup - getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(), getEndpoint()); + endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); } - - /** - * Class the processes the exchange when a file has been polled. - */ - private class ProcessFile extends GenericFileConsumerSupport<T> { - - public ProcessFile(GenericFileConsumer<T> consumer) { - super(consumer); - } - - @Override - void handleExceptionStrategy(Exception e) { - // handle the exception on the consumer - handleException(e); - } - - @Override - void processFileStrategy(Exchange exchange) { - // process the exchange using the async consumer to support async routing engine - // which can be supported by this file consumer as all the done work is - // provided in the GenericFileOnCompletion - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // noop - } - }); - } - } - } Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Tue Jun 28 12:16:57 2011 @@ -39,7 +39,7 @@ public abstract class RemoteFileConsumer return (RemoteFileEndpoint<T>) super.getEndpoint(); } - public RemoteFileOperations getOperations() { + protected RemoteFileOperations getOperations() { return (RemoteFileOperations) operations; }