This is an automated email from the ASF dual-hosted git repository. bvahdat pushed a commit to branch camel-3.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push: new c294a7cc6bd CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once c294a7cc6bd is described below commit c294a7cc6bd1c2d8216ec23cff5b22c8b38615ae Author: Babak Vahdat <bvah...@apache.org> AuthorDate: Sat Dec 24 12:44:57 2022 +0100 CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once --- .../camel/processor/OnCompletionProcessor.java | 63 +++++++------ .../model/RouteConfigurationOnCompletionTest.java | 105 +++++++++++++++++++++ 2 files changed, 138 insertions(+), 30 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java index b7de659df48..58c708c29cc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java @@ -217,7 +217,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac Exchange answer; if (isCreateCopy()) { - // for asynchronous routing we must use a copy as we dont want it + // for asynchronous routing we must use a copy as we don't want it // to cause side effects of the original exchange // (the original thread will run in parallel) answer = ExchangeHelper.createCorrelatedCopy(exchange, false); @@ -277,31 +277,12 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac } @Override - @SuppressWarnings("unchecked") public void onComplete(final Exchange exchange) { - String currentRouteId = ExchangeHelper.getRouteId(exchange); - if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) { - return; - } - - if (routeScoped) { - // check if we visited the route - List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class); - if (routeIds == null || !routeIds.contains(routeId)) { - return; - } - } - - if (onFailureOnly) { + if (shouldSkip(exchange, onFailureOnly)) { return; } - if (onWhen != null && !onWhen.matches(exchange)) { - // predicate did not match so do not route the onComplete - return; - } - - // must use a copy as we dont want it to cause side effects of the original exchange + // must use a copy as we don't want it to cause side effects of the original exchange final Exchange copy = prepareExchange(exchange); if (executorService != null) { @@ -321,16 +302,11 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac @Override public void onFailure(final Exchange exchange) { - if (onCompleteOnly) { - return; - } - - if (onWhen != null && !onWhen.matches(exchange)) { - // predicate did not match so do not route the onComplete + if (shouldSkip(exchange, onCompleteOnly)) { return; } - // must use a copy as we dont want it to cause side effects of the original exchange + // must use a copy as we don't want it to cause side effects of the original exchange final Exchange copy = prepareExchange(exchange); final Exception original = copy.getException(); if (original != null) { @@ -358,6 +334,33 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac } } + @SuppressWarnings("unchecked") + private boolean shouldSkip(Exchange exchange, boolean onCompleteOrOnFailureOnly) { + String currentRouteId = ExchangeHelper.getRouteId(exchange); + if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) { + return true; + } + + if (routeScoped) { + // check if we visited the route + List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class); + if (routeIds == null || !routeIds.contains(routeId)) { + return true; + } + } + + if (onCompleteOrOnFailureOnly) { + return true; + } + + if (onWhen != null && !onWhen.matches(exchange)) { + // predicate did not match so do not route the onComplete + return true; + } + + return false; + } + @Override public String toString() { if (!onCompleteOnly && !onFailureOnly) { @@ -432,7 +435,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac return; } - // must use a copy as we dont want it to cause side effects of the original exchange + // must use a copy as we don't want it to cause side effects of the original exchange final Exchange copy = prepareExchange(exchange); if (executorService != null) { diff --git a/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java new file mode 100644 index 00000000000..a589ad5baf9 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.RouteConfigurationBuilder; +import org.apache.camel.processor.OnCompletionTest; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.fail; + +public class RouteConfigurationOnCompletionTest extends ContextTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + camelContext.addRoutes(new RouteConfigurationBuilder() { + @Override + public void configuration() throws Exception { + routeConfiguration().onCompletion().onCompleteOnly().to("log:ok").to("mock:ok"); + routeConfiguration().onCompletion().onFailureOnly().to("log:fail").to("mock:fail"); + } + }); + + return camelContext; + } + + @Test + public void testOk() throws Exception { + getMockEndpoint("mock:ok").expectedMessageCount(1); + getMockEndpoint("mock:fail").expectedMessageCount(0); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testFail() throws Exception { + getMockEndpoint("mock:ok").expectedMessageCount(0); + getMockEndpoint("mock:fail").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(0); + + try { + template.sendBody("direct:start", "Kabom"); + fail("Should have thrown exception"); + } catch (Exception e) { + // expected + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testOkAndFail() throws Exception { + getMockEndpoint("mock:ok").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:fail").expectedBodiesReceived("Kabom"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:start", "Hello World"); + try { + template.sendBody("direct:start", "Kabom"); + fail("Should throw exception"); + } catch (Exception e) { + // expected + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("direct:end"); + + from("direct:end") + // CAMEL-18835: apply the processor by this route and not the one above to + // enforce multiple calls to the OnCompletionProcessor#onFailure callback + .process(new OnCompletionTest.MyProcessor()) + .to("mock:result"); + } + }; + } + +}