Author: davsclaus Date: Mon Dec 5 12:37:17 2011 New Revision: 1210432 URL: http://svn.apache.org/viewvc?rev=1210432&view=rev Log: CAMEL-4742: TokenizePair as predicate must close input stream. Fixed some tests on slower boxes / Windows.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java?rev=1210432&r1=1210431&r2=1210432&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java Mon Dec 5 12:37:17 2011 @@ -24,6 +24,7 @@ import java.util.Scanner; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Predicate; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -51,15 +52,42 @@ public class TokenPairExpressionIterator } @Override + public boolean matches(Exchange exchange) { + // as a predicate we must close the stream, as we do not return an iterator that can be used + // afterwards to iterate the input stream + Object value = doEvaluate(exchange, true); + return ObjectHelper.evaluateValuePredicate(value); + } + + @Override public Object evaluate(Exchange exchange) { + // as we return an iterator to access the input stream, we should not close it + return doEvaluate(exchange, false); + } + + /** + * Strategy to evaluate the exchange + * + * @param exchange the exchange + * @param closeStream whether to close the stream before returning from this method. + * @return the evaluated value + */ + protected Object doEvaluate(Exchange exchange, boolean closeStream) { + InputStream in = null; try { - InputStream in = exchange.getIn().getMandatoryBody(InputStream.class); + in = exchange.getIn().getMandatoryBody(InputStream.class); // we may read from a file, and want to support custom charset defined on the exchange String charset = IOHelper.getCharsetName(exchange); return createIterator(in, charset); } catch (InvalidPayloadException e) { exchange.setException(e); + // must close input stream + IOHelper.close(in); return null; + } finally { + if (closeStream) { + IOHelper.close(in); + } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java Mon Dec 5 12:37:17 2011 @@ -48,7 +48,7 @@ public class FileConsumerSuspendAndResum assertMockEndpointsSatisfied(); - Thread.sleep(100); + Thread.sleep(250); // the route is suspended by the policy so we should only receive one String[] files = new File("target/suspended/").getAbsoluteFile().list(); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java Mon Dec 5 12:37:17 2011 @@ -44,6 +44,8 @@ public class NewFileConsumerTest extends assertMockEndpointsSatisfied(); oneExchangeDone.matchesMockWaitTime(); + Thread.sleep(250); + assertTrue("Should have invoked postPollCheck", myFile.isPost()); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Dec 5 12:37:17 2011 @@ -280,52 +280,6 @@ public class AggregateProcessorTest exte ap.stop(); } - public void testAggregateInitialCompletionInterval() throws Exception { - // camel context must be started - context.start(); - - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("A+B", "C+D"); - mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval"); - - Processor done = new SendProcessor(context.getEndpoint("mock:result")); - Expression corr = header("id"); - AggregationStrategy as = new BodyInAggregatingStrategy(); - - AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService); - ap.setCompletionInterval(2000); - ap.start(); - - Exchange e1 = new DefaultExchange(context); - e1.getIn().setBody("A"); - e1.getIn().setHeader("id", 123); - - Exchange e2 = new DefaultExchange(context); - e2.getIn().setBody("B"); - e2.getIn().setHeader("id", 123); - - Exchange e3 = new DefaultExchange(context); - e3.getIn().setBody("C"); - e3.getIn().setHeader("id", 123); - - Exchange e4 = new DefaultExchange(context); - e4.getIn().setBody("D"); - e4.getIn().setHeader("id", 123); - - ap.process(e1); - - Thread.sleep(1500L); - ap.process(e2); - - Thread.sleep(500L); - ap.process(e3); - ap.process(e4); - - assertMockEndpointsSatisfied(); - - ap.stop(); - } - public void testAggregateIgnoreInvalidCorrelationKey() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("A+C+END");