Author: davsclaus
Date: Thu Mar 11 10:38:49 2010
New Revision: 921790
URL: http://svn.apache.org/viewvc?rev=921790&view=rev
Log:
CAMEL-2538: pollEnrich now handover completion from external resource. This
allows to use the move options from the file/ftp component etc.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=921790&r1=921789&r2=921790&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Mar
11 10:38:49 2010
@@ -371,4 +371,11 @@ public interface Exchange {
*/
void addOnCompletion(Synchronization onCompletion);
+ /**
+ * Handover all the on completions from this exchange to the target
exchange.
+ *
+ * @param target the target exchange
+ */
+ void handoverCompletions(Exchange target);
+
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=921790&r1=921789&r2=921790&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Thu Mar 11 10:38:49 2010
@@ -297,29 +297,43 @@ public final class DefaultExchange imple
public void setUnitOfWork(UnitOfWork unitOfWork) {
this.unitOfWork = unitOfWork;
- if (this.onCompletions != null) {
+ if (onCompletions != null) {
// now an unit of work has been assigned so add the on completions
// we might have registered already
- for (Synchronization onCompletion : this.onCompletions) {
- this.unitOfWork.addSynchronization(onCompletion);
+ for (Synchronization onCompletion : onCompletions) {
+ unitOfWork.addSynchronization(onCompletion);
}
// cleanup the temporary on completion list as they now have been
registered
// on the unit of work
- this.onCompletions.clear();
- this.onCompletions = null;
+ onCompletions.clear();
+ onCompletions = null;
}
}
public void addOnCompletion(Synchronization onCompletion) {
- if (this.unitOfWork == null) {
+ if (unitOfWork == null) {
// unit of work not yet registered so we store the on completion
temporary
// until the unit of work is assigned to this exchange by the
UnitOfWorkProcessor
- if (this.onCompletions == null) {
- this.onCompletions = new ArrayList<Synchronization>();
+ if (onCompletions == null) {
+ onCompletions = new ArrayList<Synchronization>();
}
- this.onCompletions.add(onCompletion);
+ onCompletions.add(onCompletion);
} else {
- this.getUnitOfWork().addSynchronization(onCompletion);
+ getUnitOfWork().addSynchronization(onCompletion);
+ }
+ }
+
+ public void handoverCompletions(Exchange target) {
+ if (onCompletions != null) {
+ for (Synchronization onCompletion : onCompletions) {
+ target.addOnCompletion(onCompletion);
+ }
+ // cleanup the temporary on completion list as they have been
handed over
+ onCompletions.clear();
+ onCompletions = null;
+ } else if (unitOfWork != null) {
+ // let unit of work handover
+ unitOfWork.handoverSynchronization(target);
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=921790&r1=921789&r2=921790&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Thu Mar 11 10:38:49 2010
@@ -150,6 +150,10 @@ public class PollEnricher extends Servic
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving
pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
+ // handover any synchronization
+ if (resourceExchange != null) {
+ resourceExchange.handoverCompletions(exchange);
+ }
}
}
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java?rev=921790&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
Thu Mar 11 10:38:49 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class FilePollEnrichTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/pollenrich");
+ super.setUp();
+ }
+
+ public void testFilePollEnrich() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedFileExists("target/pollenrich/done/hello.txt");
+
+ template.sendBodyAndHeader("file:target/pollenrich", "Hello World",
Exchange.FILE_NAME, "hello.txt");
+
+ assertMockEndpointsSatisfied();
+
+ // file should be moved
+ Thread.sleep(1000);
+ File file = new File("target/pollenrich/hello.txt").getAbsoluteFile();
+ assertFalse("File should have been moved", file.exists());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer:foo?period=2000").routeId("foo")
+ .pollEnrich("file:target/pollenrich?move=done", 5000)
+ .convertBodyTo(String.class)
+ .to("mock:result")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ // stop route after use to prevent firing timer
again
+ exchange.getContext().stopRoute("foo");
+ }
+ });
+ }
+ };
+ }
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date