zregvart commented on a change in pull request #575:
URL: https://github.com/apache/camel-website/pull/575#discussion_r631360303



##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)

Review comment:
       ```suggestion
   Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a Camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 

Review comment:
       ```suggestion
   2. Additionally, you can plug in an implementation of a [Camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user-defined logic to aggregate the output from each of those child paths 
before continuing further downstream routing. 
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.
+```java
+        @Override
+        public void run() {
+            boolean stop = exchange.isRouteStop();
+            int num = index;
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so 
breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index++);
+
+                processor.process(exchange, this);
+            } else {
+                // copyResults is needed in case MEP is OUT and the message is 
not an OUT message
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+                // we are logging the completion if will be confusing if we 
log the original instead
+                // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+            }
+        }
+    }
+```
+This decision is made inside the if block <br>
+```if (!stop && more && (first || continueProcessing(exchange, "so breaking 
out of pipeline", LOG)))```
+
+The Pipeline stops routing to next processor under following 3 conditions
+- If previous processors have marked route stop on the exchange object. 
+```boolean stop = exchange.isRouteStop();```
+- There are no more processors in the pipeline
+```boolean more = num < size;```
+- PipelineHelper.continueProcessing() evaluates to ```false``` when an 
exchange encounters any java exception during the course of routing and gets 
handled via exception handling routines. Refer the implementation code below 
<br>
+```java
+public final class PipelineHelper {
+    public static boolean continueProcessing(Exchange exchange, String 
message, Logger log) {
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        boolean stop = ee.isFailed() || ee.isRollbackOnly() || 
ee.isRollbackOnlyLast()
+                || (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled());
+        if (stop) {
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed: 
").append(message).append(" for exchange: ").append(exchange);
+                if (exchange.isRollbackOnly() || 
exchange.isRollbackOnlyLast()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (exchange.getException() != null) {
+                    sb.append(" Exception: ").append(exchange.getException());
+                }
+                if (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled()) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+        if (ee.isRouteStop()) {
+            if (log.isDebugEnabled()) {
+                log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+}
+```
+ 
+Well, now lets re-visit our use case. What if you still want to continue 
routing?
+- From our above aggregator, you will notice that the very first exchange 
which arrives in aggregator becomes the base exchange on which the aggregator 
continues to pile up body content (with incoming results from other child 
routes). Infact, a lot of camel users follow this pattern of writing an 
aggregator strategy. Unfortunately, if done this way, the state variables set 
on the Exchange object during exception handling get carried forward to the 
next evaluation point in Pipeline and qualify to stop routing. 

Review comment:
       ```suggestion
   - From our above aggregator, you will notice that the very first exchange 
which arrives in aggregator becomes the base exchange on which the aggregator 
continues to pile up body content (with incoming results from other child 
routes). In fact, a lot of Camel users follow this pattern of writing an 
aggregator strategy. Unfortunately, if done this way, the state variables set 
on the Exchange object during exception handling get carried forward to the 
next evaluation point in Pipeline and qualify to stop routing. 
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.

Review comment:
       ```suggestion
   To understand this better let's deep dive into the open source codebase. 
Check out `PipelineProcessor.java` (part of `camel-core-processors` module). 
The following section of code in the class `Pipeline` evaluates after every 
user processor (user added steps in a Camel flow) on whether it should continue 
routing to the next processor.
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 

Review comment:
       ```suggestion
   Check out the following Camel routes 
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)

Review comment:
       ```suggestion
   Multicast is a powerful EIP that asynchronously supports parallel execution 
paths. There are various ways a camel user can configure a multicast EIP. Check 
out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.
+```java
+        @Override
+        public void run() {
+            boolean stop = exchange.isRouteStop();
+            int num = index;
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so 
breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index++);
+
+                processor.process(exchange, this);
+            } else {
+                // copyResults is needed in case MEP is OUT and the message is 
not an OUT message
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+                // we are logging the completion if will be confusing if we 
log the original instead
+                // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+            }
+        }
+    }
+```
+This decision is made inside the if block <br>
+```if (!stop && more && (first || continueProcessing(exchange, "so breaking 
out of pipeline", LOG)))```
+
+The Pipeline stops routing to next processor under following 3 conditions
+- If previous processors have marked route stop on the exchange object. 
+```boolean stop = exchange.isRouteStop();```
+- There are no more processors in the pipeline
+```boolean more = num < size;```
+- PipelineHelper.continueProcessing() evaluates to ```false``` when an 
exchange encounters any java exception during the course of routing and gets 
handled via exception handling routines. Refer the implementation code below 
<br>
+```java
+public final class PipelineHelper {
+    public static boolean continueProcessing(Exchange exchange, String 
message, Logger log) {
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        boolean stop = ee.isFailed() || ee.isRollbackOnly() || 
ee.isRollbackOnlyLast()
+                || (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled());
+        if (stop) {
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed: 
").append(message).append(" for exchange: ").append(exchange);
+                if (exchange.isRollbackOnly() || 
exchange.isRollbackOnlyLast()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (exchange.getException() != null) {
+                    sb.append(" Exception: ").append(exchange.getException());
+                }
+                if (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled()) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+        if (ee.isRouteStop()) {
+            if (log.isDebugEnabled()) {
+                log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+}
+```
+ 
+Well, now lets re-visit our use case. What if you still want to continue 
routing?
+- From our above aggregator, you will notice that the very first exchange 
which arrives in aggregator becomes the base exchange on which the aggregator 
continues to pile up body content (with incoming results from other child 
routes). Infact, a lot of camel users follow this pattern of writing an 
aggregator strategy. Unfortunately, if done this way, the state variables set 
on the Exchange object during exception handling get carried forward to the 
next evaluation point in Pipeline and qualify to stop routing. 

Review comment:
       ```suggestion
   - From our above aggregator, you will notice that the very first exchange 
that arrives in the aggregator becomes the base exchange on which the 
aggregator continues to accumulate body contents (with incoming results from 
other child routes). In fact, a lot of Camel users follow this pattern of 
writing an aggregator strategy. Unfortunately, if done this way, the state 
variables set on the Exchange object during exception handling get carried 
forward to the next evaluation point in Pipeline and qualify to stop routing. 
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.
+```java
+        @Override
+        public void run() {
+            boolean stop = exchange.isRouteStop();
+            int num = index;
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so 
breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index++);
+
+                processor.process(exchange, this);
+            } else {
+                // copyResults is needed in case MEP is OUT and the message is 
not an OUT message
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+                // we are logging the completion if will be confusing if we 
log the original instead
+                // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+            }
+        }
+    }
+```
+This decision is made inside the if block <br>
+```if (!stop && more && (first || continueProcessing(exchange, "so breaking 
out of pipeline", LOG)))```
+
+The Pipeline stops routing to next processor under following 3 conditions
+- If previous processors have marked route stop on the exchange object. 
+```boolean stop = exchange.isRouteStop();```
+- There are no more processors in the pipeline
+```boolean more = num < size;```
+- PipelineHelper.continueProcessing() evaluates to ```false``` when an 
exchange encounters any java exception during the course of routing and gets 
handled via exception handling routines. Refer the implementation code below 
<br>
+```java
+public final class PipelineHelper {
+    public static boolean continueProcessing(Exchange exchange, String 
message, Logger log) {
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        boolean stop = ee.isFailed() || ee.isRollbackOnly() || 
ee.isRollbackOnlyLast()
+                || (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled());
+        if (stop) {
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed: 
").append(message).append(" for exchange: ").append(exchange);
+                if (exchange.isRollbackOnly() || 
exchange.isRollbackOnlyLast()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (exchange.getException() != null) {
+                    sb.append(" Exception: ").append(exchange.getException());
+                }
+                if (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled()) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+        if (ee.isRouteStop()) {
+            if (log.isDebugEnabled()) {
+                log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+}
+```
+ 
+Well, now lets re-visit our use case. What if you still want to continue 
routing?
+- From our above aggregator, you will notice that the very first exchange 
which arrives in aggregator becomes the base exchange on which the aggregator 
continues to pile up body content (with incoming results from other child 
routes). Infact, a lot of camel users follow this pattern of writing an 
aggregator strategy. Unfortunately, if done this way, the state variables set 
on the Exchange object during exception handling get carried forward to the 
next evaluation point in Pipeline and qualify to stop routing. 
+<br>
+
+## Solution 
+
+There are many ways a user could neutralize the states set by the exception 
handling framework. However, for the scope of this article, we chose the 
following strategy. 
+- If first child route exchange never encountered an exception, then continue 
processing the rest of aggregation cycle as usual.
+- If the first child encountered an excetion, then introspect the incoming 
exchanges for success case. If found, shift the base to be the first succeess 
exchange and move the aggregated results on to it and continue the rest of 
aggregation lifecycle as usual.
+
+Updated AggregationStrategy
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        if(hadException(oldExchange)) {
+            if(!hadException(newExchange)) {
+                // aggregate and swap the base
+                LOGGER.info("Found new exchange with success. swapping the 
base exchange");
+                List<String> oldData = oldExchange.getIn().getBody(List.class);
+                oldData.add(newExchange.getIn().getBody(String.class));
+                newExchange.getIn().setBody(oldData); // swapped the base here
+                return newExchange;
+            }
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+
+
+    private boolean hadException(Exchange exchange) {
+
+        if(exchange.isFailed()) {
+            return true;
+        }
+
+        if(exchange.isRollbackOnly()) {
+            return true;
+        }
+
+        if(exchange.isRollbackOnlyLast()) {
+            return true;
+        }
+
+        if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()
+                && ((ExtendedExchange)exchange).isErrorHandlerHandled()) {
+            return true;
+        }
+
+        return false;
+    }
+}
+````

Review comment:
       There is one extra backtick here and I can't make a suggestion for it, 
we need three not four here.

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.
+```java
+        @Override
+        public void run() {
+            boolean stop = exchange.isRouteStop();
+            int num = index;
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so 
breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index++);
+
+                processor.process(exchange, this);
+            } else {
+                // copyResults is needed in case MEP is OUT and the message is 
not an OUT message
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+                // we are logging the completion if will be confusing if we 
log the original instead
+                // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+            }
+        }
+    }
+```
+This decision is made inside the if block <br>
+```if (!stop && more && (first || continueProcessing(exchange, "so breaking 
out of pipeline", LOG)))```
+
+The Pipeline stops routing to next processor under following 3 conditions
+- If previous processors have marked route stop on the exchange object. 
+```boolean stop = exchange.isRouteStop();```
+- There are no more processors in the pipeline
+```boolean more = num < size;```
+- PipelineHelper.continueProcessing() evaluates to ```false``` when an 
exchange encounters any java exception during the course of routing and gets 
handled via exception handling routines. Refer the implementation code below 
<br>
+```java
+public final class PipelineHelper {
+    public static boolean continueProcessing(Exchange exchange, String 
message, Logger log) {
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        boolean stop = ee.isFailed() || ee.isRollbackOnly() || 
ee.isRollbackOnlyLast()
+                || (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled());
+        if (stop) {
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed: 
").append(message).append(" for exchange: ").append(exchange);
+                if (exchange.isRollbackOnly() || 
exchange.isRollbackOnlyLast()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (exchange.getException() != null) {
+                    sb.append(" Exception: ").append(exchange.getException());
+                }
+                if (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled()) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+        if (ee.isRouteStop()) {
+            if (log.isDebugEnabled()) {
+                log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+}
+```
+ 
+Well, now lets re-visit our use case. What if you still want to continue 
routing?
+- From our above aggregator, you will notice that the very first exchange 
which arrives in aggregator becomes the base exchange on which the aggregator 
continues to pile up body content (with incoming results from other child 
routes). Infact, a lot of camel users follow this pattern of writing an 
aggregator strategy. Unfortunately, if done this way, the state variables set 
on the Exchange object during exception handling get carried forward to the 
next evaluation point in Pipeline and qualify to stop routing. 
+<br>
+
+## Solution 
+
+There are many ways a user could neutralize the states set by the exception 
handling framework. However, for the scope of this article, we chose the 
following strategy. 
+- If first child route exchange never encountered an exception, then continue 
processing the rest of aggregation cycle as usual.
+- If the first child encountered an excetion, then introspect the incoming 
exchanges for success case. If found, shift the base to be the first succeess 
exchange and move the aggregated results on to it and continue the rest of 
aggregation lifecycle as usual.

Review comment:
       ```suggestion
   - If the first child encountered an exception, then introspect the incoming 
exchanges for success case. If found, shift the base to be the first successful 
exchange and move the aggregated results on to it and continue the rest of the 
aggregation lifecycle as usual.
   ```

##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,276 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [hokutor, mathewsreji]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](/components/latest/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java)
 with user defined logic to aggregate the output from each of those child path 
before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. Check 
out PipelineProcessor.java (part of camel-core-processors module). Following 
section of code in the class Pipeline performs an evaluation after every user 
processors (user added steps in a camel flow) on whether it should continue 
routing to the next processor.
+```java
+        @Override
+        public void run() {
+            boolean stop = exchange.isRouteStop();
+            int num = index;
+            boolean more = num < size;
+            boolean first = num == 0;
+
+            if (!stop && more && (first || continueProcessing(exchange, "so 
breaking out of pipeline", LOG))) {
+
+                // prepare for next run
+                if (exchange.hasOut()) {
+                    exchange.setIn(exchange.getOut());
+                    exchange.setOut(null);
+                }
+
+                // get the next processor
+                AsyncProcessor processor = processors.get(index++);
+
+                processor.process(exchange, this);
+            } else {
+                // copyResults is needed in case MEP is OUT and the message is 
not an OUT message
+                ExchangeHelper.copyResults(exchange, exchange);
+
+                // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+                // we are logging the completion if will be confusing if we 
log the original instead
+                // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+            }
+        }
+    }
+```
+This decision is made inside the if block <br>
+```if (!stop && more && (first || continueProcessing(exchange, "so breaking 
out of pipeline", LOG)))```
+
+The Pipeline stops routing to next processor under following 3 conditions
+- If previous processors have marked route stop on the exchange object. 
+```boolean stop = exchange.isRouteStop();```
+- There are no more processors in the pipeline
+```boolean more = num < size;```
+- PipelineHelper.continueProcessing() evaluates to ```false``` when an 
exchange encounters any java exception during the course of routing and gets 
handled via exception handling routines. Refer the implementation code below 
<br>
+```java
+public final class PipelineHelper {
+    public static boolean continueProcessing(Exchange exchange, String 
message, Logger log) {
+        ExtendedExchange ee = (ExtendedExchange) exchange;
+        boolean stop = ee.isFailed() || ee.isRollbackOnly() || 
ee.isRollbackOnlyLast()
+                || (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled());
+        if (stop) {
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed: 
").append(message).append(" for exchange: ").append(exchange);
+                if (exchange.isRollbackOnly() || 
exchange.isRollbackOnlyLast()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (exchange.getException() != null) {
+                    sb.append(" Exception: ").append(exchange.getException());
+                }
+                if (ee.isErrorHandlerHandledSet() && 
ee.isErrorHandlerHandled()) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+        if (ee.isRouteStop()) {
+            if (log.isDebugEnabled()) {
+                log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+}
+```
+ 
+Well, now lets re-visit our use case. What if you still want to continue 
routing?

Review comment:
       ```suggestion
   Well, now let's re-visit our use case. What if you still want to continue 
routing?
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to