This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 95366aee7f56564bb8170b4c4a40429c1fba5e09 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Apr 22 13:48:18 2020 +0200 CAMEL-14951: WireTap - If thread pool reject task then Camel error handler should be able to react --- .../apache/camel/processor/WireTapProcessor.java | 27 +++--- .../camel/processor/WireTapAbortPolicyTest.java | 99 ++++++++++++++++++++++ 2 files changed, 116 insertions(+), 10 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 00588e7..a63781b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; import org.apache.camel.AsyncCallback; @@ -168,17 +169,23 @@ public class WireTapProcessor extends AsyncProcessorSupport implements Traceable final Exchange wireTapExchange = target; // send the exchange to the destination using an executor service - executorService.submit(() -> { - taskCount.increment(); - LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); - asyncProcessor.process(wireTapExchange, doneSync -> { - if (wireTapExchange.getException() != null) { - String u = URISupport.sanitizeUri(uri); - LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + u + ". This exception will be ignored.", wireTapExchange.getException()); - } - taskCount.decrement(); + try { + executorService.submit(() -> { + taskCount.increment(); + LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); + asyncProcessor.process(wireTapExchange, doneSync -> { + if (wireTapExchange.getException() != null) { + String u = URISupport.sanitizeUri(uri); + LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + u + ". This exception will be ignored.", wireTapExchange.getException()); + } + taskCount.decrement(); + }); }); - }); + } catch (Throwable e) { + // in case the thread pool rejects or cannot submit the task then we need to catch + // so camel error handler can react + exchange.setException(e); + } // continue routing this synchronously callback.done(true); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java new file mode 100644 index 0000000..80cbd90 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Wire tap unit test + */ +public class WireTapAbortPolicyTest extends ContextTestSupport { + protected MockEndpoint tap; + protected MockEndpoint result; + protected ExecutorService pool; + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + if (pool != null) { + pool.shutdownNow(); + } + } + + @Test + public void testSend() throws Exception { + // hello must come first, as we have delay on the tapped route + result.expectedMinimumMessageCount(2); + tap.expectedMinimumMessageCount(1); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + try { + template.sendBody("direct:start", "C"); + fail("Task should be rejected"); + } catch (Exception e) { + assertIsInstanceOf(RejectedExecutionException.class, e.getCause()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tap = getMockEndpoint("mock:tap"); + result = getMockEndpoint("mock:result"); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: e1 + // use a custom thread pool for sending tapped messages + ExecutorService pool = new ThreadPoolBuilder(context) + // only allow 1 thread and 1 pending task + .poolSize(1) + .maxPoolSize(1) + .maxQueueSize(1) + // and about tasks + .rejectedPolicy(ThreadPoolRejectedPolicy.Abort) + .build(); + + from("direct:start").to("log:foo") + // pass in the custom pool to the wireTap DSL + .wireTap("direct:tap").executorService(pool).to("mock:result"); + // END SNIPPET: e1 + + from("direct:tap").delay(1000).to("mock:tap"); + } + }; + } +}