hokutor commented on a change in pull request #575:
URL: https://github.com/apache/camel-website/pull/575#discussion_r631347857



##########
File path: content/blog/2021/05/multicast-failure-routing/index.md
##########
@@ -0,0 +1,199 @@
+---
+title: "Routing multicast output after encountering partial failures"
+date: 2021-05-10
+draft: false
+authors: [Reji Mathews, Hokutor]
+categories: ["Usecases"]
+preview: "Routing multicast output after encountering partial failures"
+---
+
+## Problem description
+
+Multicast is a powerful EIP which supports parallel execution paths in 
asynchronous manner. There are various ways a camel user can configure a 
multicast EIP. Check out the extensive documentation 
[here](https://camel.apache.org/components/3.4.x/eips/multicast-eip.html)
+1. One can configure to execute all the child paths independently and continue 
routing the last reply as the outgoing message (default behavior unless you 
provide an aggregation strategy)
+2. Additionally, you can plug in an implementation of [camel aggregation 
strategy]( 
https://github.com/apache/camel/blob/camel-3.7.x/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
 ) with user defined logic to aggregate the output from each of those child 
path before continuing further downstream routing. 
+
+For the use case discussed below, the requirement is to aggregate the computed 
results from all child paths before it gets routed to the downstream processors 
in the flow. The idea is to keep routing the aggregated results if atleast one 
child route completes successfully without an exception. We also want to stop 
routing further if all the child exchanges experienced failures.
+
+## Use case 
+
+Check out the following camel routes 
+
+```java
+@Override
+public void configure() throws Exception {
+    onException(Exception.class)
+        .useOriginalMessage()
+        .handled(true)
+        .log("Exception handler invoked")
+        .transform().constant("{\"data\" : \"err\"}")
+        .end();
+
+    from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET";)
+        .log("received request")
+        .log("Entering multicast")
+        .multicast(new SimpleFlowMergeAggregator())
+        .parallelProcessing().to("direct:A", "direct:B")
+        .end()
+        .log("Aggregated results ${body}")
+        .log("Another log")
+        .transform(simple("{\"result\" : \"success\"}"))
+        .end();
+
+    from("direct:A")
+        .log("Executing PATH_1 - exception path")
+        .transform(constant("DATA_FROM_PATH_1"))
+        .log("Starting exception throw")
+        .throwException(new Exception("USER INITIATED EXCEPTION"))
+        .log("PATH_1")
+        .end();
+
+    from("direct:B")
+        .log("Executing PATH_2 - success path")
+        .delayer(1000)
+        .transform(constant("DATA_FROM_PATH_2"))
+        .log("PATH_2")
+        .end();
+}
+````
+
+
+Following strategy aggregates the output of each multicast child route as a 
java list
+
+```java
+public class SimpleFlowMergeAggregator implements AggregationStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());
+        if(oldExchange == null) {
+            String data = newExchange.getIn().getBody(String.class);
+            List<String> aggregatedDataList = new ArrayList<>();
+            aggregatedDataList.add(data);
+            newExchange.getIn().setBody(aggregatedDataList);
+            return newExchange;
+        }
+
+        List<String> oldData = oldExchange.getIn().getBody(List.class);
+        oldData.add(newExchange.getIn().getBody(String.class));
+        oldExchange.getIn().setBody(oldData);
+
+        return oldExchange;
+    }
+}
+```
+
+On executing the same, we see following logs
+```log
+2021-05-06 12:43:18.565  INFO 13956 --- [qtp916897446-42] route1               
                    : received request
+2021-05-06 12:43:18.566  INFO 13956 --- [qtp916897446-42] route1               
                    : Entering multicast
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Executing PATH_1 - exception path
+2021-05-06 12:43:18.575  INFO 13956 --- [ #4 - Multicast] route2               
                    : Starting exception throw
+2021-05-06 12:43:18.578  INFO 13956 --- [ #4 - Multicast] route2               
                    : Exception handler invoked
+2021-05-06 12:43:18.579  INFO 13956 --- [ #4 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator {"data" : "err"}
+2021-05-06 12:43:19.575  INFO 13956 --- [ #3 - Multicast] route3               
                    : Executing PATH_2 - success path
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] route3               
                    : PATH_2
+2021-05-06 12:43:21.576  INFO 13956 --- [ #3 - Multicast] 
c.e.d.m.SimpleFlowMergeAggregator        : Inside aggregator DATA_FROM_PATH_2
+```
+
+## What could take you by a surprise?
+
+- When the multicast completes aggregating exchanges from child branches, one 
might intermittently note that it stops routing the remaining processors (those 
2 additional log and a transform step in above example). On execution tracing 
exercise, you will notice this happens in a special scenario when the very 
first exchange which arrives in the aggregator (from first completed child 
branch) had encountered an exception during its course or/and was handled via 
onException flows. On the flip side, if the first exchange was a successful and 
even though all the remaining ones experienced a failure, it still continued 
routing the remaining processors/steps.
+
+
+## Analysis
+
+To understand this better, lets deep dive into the open source codebase. 
Notice the [Pipeline class]( 
https://github.com/apache/camel/blob/6dff85675e9b73e8a528bc2683935ec3c1ed26b7/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
 ). PipelineProcessor (part of camel core framework processors) performs an 
evaluation after every user processors (user added steps in a camel flow) on 
whether it should continue routing to the next processor.
+This decision is made inside Pipeline class on [this if 
block](https://github.com/apache/camel/blob/6dff85675e9b73e8a528bc2683935ec3c1ed26b7/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java#L79)
 <br>

Review comment:
       done. can you review now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to