This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4a75e5d1c61514f515adcde97ee4c06c6e7c5548
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Tue May 14 14:43:41 2024 +0200

    Filter out null pairs earlier in multicast processor
---
 .../apache/camel/processor/MulticastProcessor.java | 16 ++--------
 ...rParallelWithIteratorThrowingExceptionTest.java | 34 ++++++++++++----------
 2 files changed, 22 insertions(+), 28 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 89f946fa4e7..4f1fe2bdddb 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
@@ -69,6 +70,7 @@ import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FilterIterator;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
@@ -407,7 +409,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
-            this.iterator = pairs.iterator();
+            this.iterator = new FilterIterator<>(pairs.iterator(), 
Objects::nonNull);
             if (timeout > 0) {
                 timeoutTask = schedule(aggregateExecutorService, 
this::timeout, timeout, TimeUnit.MILLISECONDS);
             } else {
@@ -544,13 +546,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
                 ProcessorExchangePair pair = iterator.next();
                 boolean hasNext = iterator.hasNext();
-                // some iterators may return true for hasNext() but then null 
in next()
-                if (pair == null && !hasNext) {
-                    doDone(result.get(), true);
-                    return;
-                }
 
-                // TODO looks like pair can still be null as the if above has 
composite condition?
                 Exchange exchange = pair.getExchange();
                 int index = nbExchangeSent.getAndIncrement();
                 updateNewExchange(exchange, index, pairs, hasNext);
@@ -654,13 +650,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
             ProcessorExchangePair pair = iterator.next();
             boolean hasNext = iterator.hasNext();
-            // some iterators may return true for hasNext() but then null in 
next()
-            if (pair == null && !hasNext) {
-                doDone(result.get(), true);
-                return false;
-            }
 
-            // TODO looks like pair can still be null as the if above has 
composite condition?
             Exchange exchange = pair.getExchange();
             int index = nbExchangeSent.getAndIncrement();
             updateNewExchange(exchange, index, pairs, hasNext);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
index 17ec46cdd4e..ccd568d9572 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
@@ -17,7 +17,6 @@
 package org.apache.camel.issues;
 
 import java.util.Iterator;
-import java.util.function.Consumer;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -50,11 +49,27 @@ public class 
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
 
     @Test
     public void testIteratorThrowExceptionOnSecond() throws Exception {
+        getMockEndpoint("mock:line").expectedMessageCount(0);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", new MyIterator(2));
+            fail("Should throw exception");
+        } catch (Exception e) {
+            IllegalArgumentException iae = 
assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced error", iae.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testIteratorThrowExceptionOnThird() throws Exception {
         getMockEndpoint("mock:line").expectedMessageCount(1);
         getMockEndpoint("mock:end").expectedMessageCount(0);
 
         try {
-            template.sendBody("direct:start", new MyIterator(0));
+            template.sendBody("direct:start", new MyIterator(3));
             fail("Should throw exception");
         } catch (Exception e) {
             IllegalArgumentException iae = 
assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
@@ -86,27 +101,16 @@ public class 
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
 
         @Override
         public boolean hasNext() {
-            return count < 2;
+            return true;
         }
 
         @Override
         public String next() {
-            count++;
-            if (count == 1) {
+            if (--count > 0) {
                 return "Hello";
             } else {
                 throw new IllegalArgumentException("Forced error");
             }
         }
-
-        @Override
-        public void remove() {
-            // noop
-        }
-
-        @Override
-        public void forEachRemaining(Consumer<? super String> action) {
-            // noop
-        }
     }
 }

Reply via email to