This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4a75e5d1c61514f515adcde97ee4c06c6e7c5548 Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue May 14 14:43:41 2024 +0200 Filter out null pairs earlier in multicast processor --- .../apache/camel/processor/MulticastProcessor.java | 16 ++-------- ...rParallelWithIteratorThrowingExceptionTest.java | 34 ++++++++++++---------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 89f946fa4e7..4f1fe2bdddb 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -69,6 +70,7 @@ import org.apache.camel.support.PatternHelper; import org.apache.camel.support.PluginHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; +import org.apache.camel.util.FilterIterator; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AsyncCompletionService; @@ -407,7 +409,7 @@ public class MulticastProcessor extends AsyncProcessorSupport this.original = original; this.pairs = pairs; this.callback = callback; - this.iterator = pairs.iterator(); + this.iterator = new FilterIterator<>(pairs.iterator(), Objects::nonNull); if (timeout > 0) { timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); } else { @@ -544,13 +546,7 @@ public class MulticastProcessor extends AsyncProcessorSupport ProcessorExchangePair pair = iterator.next(); boolean hasNext = iterator.hasNext(); - // some iterators may return true for hasNext() but then null in next() - if (pair == null && !hasNext) { - doDone(result.get(), true); - return; - } - // TODO looks like pair can still be null as the if above has composite condition? Exchange exchange = pair.getExchange(); int index = nbExchangeSent.getAndIncrement(); updateNewExchange(exchange, index, pairs, hasNext); @@ -654,13 +650,7 @@ public class MulticastProcessor extends AsyncProcessorSupport ProcessorExchangePair pair = iterator.next(); boolean hasNext = iterator.hasNext(); - // some iterators may return true for hasNext() but then null in next() - if (pair == null && !hasNext) { - doDone(result.get(), true); - return false; - } - // TODO looks like pair can still be null as the if above has composite condition? Exchange exchange = pair.getExchange(); int index = nbExchangeSent.getAndIncrement(); updateNewExchange(exchange, index, pairs, hasNext); diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java index 17ec46cdd4e..ccd568d9572 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java @@ -17,7 +17,6 @@ package org.apache.camel.issues; import java.util.Iterator; -import java.util.function.Consumer; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; @@ -50,11 +49,27 @@ public class SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe @Test public void testIteratorThrowExceptionOnSecond() throws Exception { + getMockEndpoint("mock:line").expectedMessageCount(0); + getMockEndpoint("mock:end").expectedMessageCount(0); + + try { + template.sendBody("direct:start", new MyIterator(2)); + fail("Should throw exception"); + } catch (Exception e) { + IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Forced error", iae.getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testIteratorThrowExceptionOnThird() throws Exception { getMockEndpoint("mock:line").expectedMessageCount(1); getMockEndpoint("mock:end").expectedMessageCount(0); try { - template.sendBody("direct:start", new MyIterator(0)); + template.sendBody("direct:start", new MyIterator(3)); fail("Should throw exception"); } catch (Exception e) { IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); @@ -86,27 +101,16 @@ public class SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe @Override public boolean hasNext() { - return count < 2; + return true; } @Override public String next() { - count++; - if (count == 1) { + if (--count > 0) { return "Hello"; } else { throw new IllegalArgumentException("Forced error"); } } - - @Override - public void remove() { - // noop - } - - @Override - public void forEachRemaining(Consumer<? super String> action) { - // noop - } } }