zregvart commented on a change in pull request #575: URL: https://github.com/apache/camel-website/pull/575#discussion_r631312854
########## File path: content/blog/2021/05/multicast-failure-routing/index.md ########## @@ -0,0 +1,199 @@ +--- +title: "Routing multicast output after encountering partial failures" +date: 2021-05-10 +draft: false +authors: [Reji Mathews, Hokutor] +categories: ["Usecases"] +preview: "Routing multicast output after encountering partial failures" +--- + +## Problem description + +Multicast is a powerful EIP which supports parallel execution paths in asynchronous manner. There are various ways a camel user can configure a multicast EIP. Check out the extensive documentation [here](https://camel.apache.org/components/3.4.x/eips/multicast-eip.html) Review comment: ```suggestion Multicast is a powerful EIP which supports parallel execution paths in asynchronous manner. There are various ways a camel user can configure a multicast EIP. Check out the extensive documentation [here](/components/latest/eips/multicast-eip.html) ``` Let's use the latest version and we need to use relative links for the link checker to check links to camel.apache.org. ########## File path: content/blog/2021/05/multicast-failure-routing/index.md ########## @@ -0,0 +1,199 @@ +--- +title: "Routing multicast output after encountering partial failures" +date: 2021-05-10 +draft: false +authors: [Reji Mathews, Hokutor] +categories: ["Usecases"] +preview: "Routing multicast output after encountering partial failures" +--- + +## Problem description + +Multicast is a powerful EIP which supports parallel execution paths in asynchronous manner. There are various ways a camel user can configure a multicast EIP. Check out the extensive documentation [here](https://camel.apache.org/components/3.4.x/eips/multicast-eip.html) +1. One can configure to execute all the child paths independently and continue routing the last reply as the outgoing message (default behavior unless you provide an aggregation strategy) +2. Additionally, you can plug in an implementation of [camel aggregation strategy]( https://github.com/apache/camel/blob/camel-3.7.x/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java ) with user defined logic to aggregate the output from each of those child path before continuing further downstream routing. Review comment: ```suggestion 2. Additionally, you can plug in an implementation of [camel aggregation strategy](https://github.com/apache/camel/blob/main/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java) with user defined logic to aggregate the output from each of those child path before continuing further downstream routing. ``` Perhaps also point to the `main` branch? ########## File path: content/blog/2021/05/multicast-failure-routing/index.md ########## @@ -0,0 +1,199 @@ +--- +title: "Routing multicast output after encountering partial failures" +date: 2021-05-10 +draft: false +authors: [Reji Mathews, Hokutor] Review comment: These need to be the GitHub usernames, we use them to pull avatars from the GitHub profile, so you might want to set an image in your GitHub profile. ########## File path: content/blog/2021/05/multicast-failure-routing/index.md ########## @@ -0,0 +1,199 @@ +--- +title: "Routing multicast output after encountering partial failures" +date: 2021-05-10 +draft: false +authors: [Reji Mathews, Hokutor] +categories: ["Usecases"] +preview: "Routing multicast output after encountering partial failures" +--- + +## Problem description + +Multicast is a powerful EIP which supports parallel execution paths in asynchronous manner. There are various ways a camel user can configure a multicast EIP. Check out the extensive documentation [here](https://camel.apache.org/components/3.4.x/eips/multicast-eip.html) +1. One can configure to execute all the child paths independently and continue routing the last reply as the outgoing message (default behavior unless you provide an aggregation strategy) +2. Additionally, you can plug in an implementation of [camel aggregation strategy]( https://github.com/apache/camel/blob/camel-3.7.x/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java ) with user defined logic to aggregate the output from each of those child path before continuing further downstream routing. + +For the use case discussed below, the requirement is to aggregate the computed results from all child paths before it gets routed to the downstream processors in the flow. The idea is to keep routing the aggregated results if atleast one child route completes successfully without an exception. We also want to stop routing further if all the child exchanges experienced failures. + +## Use case + +Check out the following camel routes + +```java +@Override +public void configure() throws Exception { + onException(Exception.class) + .useOriginalMessage() + .handled(true) + .log("Exception handler invoked") + .transform().constant("{\"data\" : \"err\"}") + .end(); + + from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET") + .log("received request") + .log("Entering multicast") + .multicast(new SimpleFlowMergeAggregator()) + .parallelProcessing().to("direct:A", "direct:B") + .end() + .log("Aggregated results ${body}") + .log("Another log") + .transform(simple("{\"result\" : \"success\"}")) + .end(); + + from("direct:A") + .log("Executing PATH_1 - exception path") + .transform(constant("DATA_FROM_PATH_1")) + .log("Starting exception throw") + .throwException(new Exception("USER INITIATED EXCEPTION")) + .log("PATH_1") + .end(); + + from("direct:B") + .log("Executing PATH_2 - success path") + .delayer(1000) + .transform(constant("DATA_FROM_PATH_2")) + .log("PATH_2") + .end(); +} +```` + + +Following strategy aggregates the output of each multicast child route as a java list + +```java +public class SimpleFlowMergeAggregator implements AggregationStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName()); + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + LOGGER.info("Inside aggregator " + newExchange.getIn().getBody()); + if(oldExchange == null) { + String data = newExchange.getIn().getBody(String.class); + List<String> aggregatedDataList = new ArrayList<>(); + aggregatedDataList.add(data); + newExchange.getIn().setBody(aggregatedDataList); + return newExchange; + } + + List<String> oldData = oldExchange.getIn().getBody(List.class); + oldData.add(newExchange.getIn().getBody(String.class)); + oldExchange.getIn().setBody(oldData); + + return oldExchange; + } +} +``` + +On executing the same, we see following logs +```log +2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request +2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast +2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path +2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw +2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked +2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"} +2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path +2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_2 +2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2 +``` + +## What could take you by a surprise? + +- When the multicast completes aggregating exchanges from child branches, one might intermittently note that it stops routing the remaining processors (those 2 additional log and a transform step in above example). On execution tracing exercise, you will notice this happens in a special scenario when the very first exchange which arrives in the aggregator (from first completed child branch) had encountered an exception during its course or/and was handled via onException flows. On the flip side, if the first exchange was a successful and even though all the remaining ones experienced a failure, it still continued routing the remaining processors/steps. + + +## Analysis + +To understand this better, lets deep dive into the open source codebase. Notice the [Pipeline class]( https://github.com/apache/camel/blob/6dff85675e9b73e8a528bc2683935ec3c1ed26b7/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java ). PipelineProcessor (part of camel core framework processors) performs an evaluation after every user processors (user added steps in a camel flow) on whether it should continue routing to the next processor. Review comment: ```suggestion To understand this better, lets deep dive into the open source codebase. Notice the [Pipeline class](https://github.com/apache/camel/blob/main/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java). PipelineProcessor (part of camel core framework processors) performs an evaluation after every user processors (user added steps in a camel flow) on whether it should continue routing to the next processor. ``` Would it make sense to point to `main` branch? ########## File path: content/blog/2021/05/multicast-failure-routing/index.md ########## @@ -0,0 +1,199 @@ +--- +title: "Routing multicast output after encountering partial failures" +date: 2021-05-10 +draft: false +authors: [Reji Mathews, Hokutor] +categories: ["Usecases"] +preview: "Routing multicast output after encountering partial failures" +--- + +## Problem description + +Multicast is a powerful EIP which supports parallel execution paths in asynchronous manner. There are various ways a camel user can configure a multicast EIP. Check out the extensive documentation [here](https://camel.apache.org/components/3.4.x/eips/multicast-eip.html) +1. One can configure to execute all the child paths independently and continue routing the last reply as the outgoing message (default behavior unless you provide an aggregation strategy) +2. Additionally, you can plug in an implementation of [camel aggregation strategy]( https://github.com/apache/camel/blob/camel-3.7.x/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java ) with user defined logic to aggregate the output from each of those child path before continuing further downstream routing. + +For the use case discussed below, the requirement is to aggregate the computed results from all child paths before it gets routed to the downstream processors in the flow. The idea is to keep routing the aggregated results if atleast one child route completes successfully without an exception. We also want to stop routing further if all the child exchanges experienced failures. + +## Use case + +Check out the following camel routes + +```java +@Override +public void configure() throws Exception { + onException(Exception.class) + .useOriginalMessage() + .handled(true) + .log("Exception handler invoked") + .transform().constant("{\"data\" : \"err\"}") + .end(); + + from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET") + .log("received request") + .log("Entering multicast") + .multicast(new SimpleFlowMergeAggregator()) + .parallelProcessing().to("direct:A", "direct:B") + .end() + .log("Aggregated results ${body}") + .log("Another log") + .transform(simple("{\"result\" : \"success\"}")) + .end(); + + from("direct:A") + .log("Executing PATH_1 - exception path") + .transform(constant("DATA_FROM_PATH_1")) + .log("Starting exception throw") + .throwException(new Exception("USER INITIATED EXCEPTION")) + .log("PATH_1") + .end(); + + from("direct:B") + .log("Executing PATH_2 - success path") + .delayer(1000) + .transform(constant("DATA_FROM_PATH_2")) + .log("PATH_2") + .end(); +} +```` + + +Following strategy aggregates the output of each multicast child route as a java list + +```java +public class SimpleFlowMergeAggregator implements AggregationStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName()); + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + LOGGER.info("Inside aggregator " + newExchange.getIn().getBody()); + if(oldExchange == null) { + String data = newExchange.getIn().getBody(String.class); + List<String> aggregatedDataList = new ArrayList<>(); + aggregatedDataList.add(data); + newExchange.getIn().setBody(aggregatedDataList); + return newExchange; + } + + List<String> oldData = oldExchange.getIn().getBody(List.class); + oldData.add(newExchange.getIn().getBody(String.class)); + oldExchange.getIn().setBody(oldData); + + return oldExchange; + } +} +``` + +On executing the same, we see following logs +```log +2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request +2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast +2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path +2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw +2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked +2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"} +2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path +2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_2 +2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2 +``` + +## What could take you by a surprise? + +- When the multicast completes aggregating exchanges from child branches, one might intermittently note that it stops routing the remaining processors (those 2 additional log and a transform step in above example). On execution tracing exercise, you will notice this happens in a special scenario when the very first exchange which arrives in the aggregator (from first completed child branch) had encountered an exception during its course or/and was handled via onException flows. On the flip side, if the first exchange was a successful and even though all the remaining ones experienced a failure, it still continued routing the remaining processors/steps. + + +## Analysis + +To understand this better, lets deep dive into the open source codebase. Notice the [Pipeline class]( https://github.com/apache/camel/blob/6dff85675e9b73e8a528bc2683935ec3c1ed26b7/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java ). PipelineProcessor (part of camel core framework processors) performs an evaluation after every user processors (user added steps in a camel flow) on whether it should continue routing to the next processor. +This decision is made inside Pipeline class on [this if block](https://github.com/apache/camel/blob/6dff85675e9b73e8a528bc2683935ec3c1ed26b7/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java#L79) <br> Review comment: Yeah I see the issue, the `main` branch could diverge and this line would not be valid any more. Do you think it makes sense to provide a snippet of the code from `Pipeline.java` here instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org