CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/586609f1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/586609f1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/586609f1 Branch: refs/heads/camel-2.17.x Commit: 586609f1619ad8266541042d2537d477fee3dee8 Parents: f78935e Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Mar 22 17:51:58 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Mar 22 17:52:26 2016 +0100 ---------------------------------------------------------------------- .../camel/processor/MulticastProcessor.java | 9 ++ .../org/apache/camel/processor/Splitter.java | 48 ++++++----- .../camel/processor/SplitIteratorNullTest.java | 91 ++++++++++++++++++++ .../tarfile/TarSplitterRouteIssueTest.java | 2 - 4 files changed, 127 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 5f96ba2..fc7da80 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -293,6 +293,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor while (it.hasNext()) { final ProcessorExchangePair pair = it.next(); + // in case the iterator returns null then continue to next + if (pair == null) { + continue; + } + final Exchange subExchange = pair.getExchange(); updateNewExchange(subExchange, total.intValue(), pairs, it); @@ -590,6 +595,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor while (it.hasNext()) { ProcessorExchangePair pair = it.next(); + // in case the iterator returns null then continue to next + if (pair == null) { + continue; + } Exchange subExchange = pair.getExchange(); updateNewExchange(subExchange, total.get(), pairs, it); http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 40ca426..fba3f71 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -180,28 +180,32 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac public ProcessorExchangePair next() { Object part = iterator.next(); - // create a correlated copy as the new exchange to be routed in the splitter from the copy - // and do not share the unit of work - Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); - // If the splitter has an aggregation strategy - // then the StreamCache created by the child routes must not be - // closed by the unit of work of the child route, but by the unit of - // work of the parent route or grand parent route or grand grand parent route... (in case of nesting). - // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set. - if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { - newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork()); - } - // if we share unit of work, we need to prepare the child exchange - if (isShareUnitOfWork()) { - prepareSharedUnitOfWork(newExchange, copy); - } - if (part instanceof Message) { - newExchange.setIn((Message) part); + if (part != null) { + // create a correlated copy as the new exchange to be routed in the splitter from the copy + // and do not share the unit of work + Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); + // If the splitter has an aggregation strategy + // then the StreamCache created by the child routes must not be + // closed by the unit of work of the child route, but by the unit of + // work of the parent route or grand parent route or grand grand parent route... (in case of nesting). + // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set. + if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { + newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork()); + } + // if we share unit of work, we need to prepare the child exchange + if (isShareUnitOfWork()) { + prepareSharedUnitOfWork(newExchange, copy); + } + if (part instanceof Message) { + newExchange.setIn((Message) part); + } else { + Message in = newExchange.getIn(); + in.setBody(part); + } + return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); } else { - Message in = newExchange.getIn(); - in.setBody(part); + return null; } - return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); } public void remove() { @@ -235,7 +239,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value); try { for (ProcessorExchangePair pair : pairs) { - result.add(pair); + if (pair != null) { + result.add(pair); + } } } finally { if (pairs instanceof Closeable) { http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java new file mode 100644 index 0000000..c44ec85 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.Iterator; +import java.util.function.Consumer; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class SplitIteratorNullTest extends ContextTestSupport { + + private MyIterator myIterator = new MyIterator(); + + public void testSplitIteratorNull() throws Exception { + assertFalse(myIterator.isNullReturned()); + getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + assertTrue(myIterator.isNullReturned()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split(constant(myIterator)) + .to("mock:line"); + } + }; + } + + private class MyIterator implements Iterator<String> { + + private int count = 4; + private boolean nullReturned; + + @Override + public boolean hasNext() { + // we return true one extra time, and cause next to return null + return count > 0; + } + + @Override + public String next() { + count--; + if (count == 0) { + nullReturned = true; + return null; + } else if (count == 1) { + return "C"; + } else if (count == 2) { + return "B"; + } else { + return "A"; + } + } + + public boolean isNullReturned() { + return nullReturned; + } + + @Override + public void remove() { + // noop + } + + @Override + public void forEachRemaining(Consumer<? super String> action) { + // noop + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java index d5702eb..1e38a2f 100644 --- a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java +++ b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java @@ -20,7 +20,6 @@ import java.io.File; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Ignore; import org.junit.Test; public class TarSplitterRouteIssueTest extends CamelTestSupport { @@ -32,7 +31,6 @@ public class TarSplitterRouteIssueTest extends CamelTestSupport { } @Test - @Ignore("CAMEL-9735: There are 3 files in the .tar file but the TarIterator has a bug causing +1 extra") public void testSplitter() throws Exception { getMockEndpoint("mock:entry").expectedMessageCount(3);