This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.25.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push: new 70a6470 CAMEL-14951: WireTap - If thread pool reject task then Camel error handler should be able to react 70a6470 is described below commit 70a6470d8eb9e3dcec6e382d77ac406250309e95 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Apr 22 14:22:56 2020 +0200 CAMEL-14951: WireTap - If thread pool reject task then Camel error handler should be able to react --- .../apache/camel/processor/WireTapProcessor.java | 32 ++++--- .../camel/processor/WireTapAbortPolicyTest.java | 98 ++++++++++++++++++++++ 2 files changed, 117 insertions(+), 13 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 63f7ae4..c269f63 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -152,20 +152,26 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, final Exchange wireTapExchange = target; // send the exchange to the destination using an executor service - executorService.submit(new Callable<Exchange>() { - public Exchange call() throws Exception { - taskCount.increment(); - try { - LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); - processor.process(wireTapExchange); - } catch (Throwable e) { - LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e); - } finally { - taskCount.decrement(); + try { + executorService.submit(new Callable<Exchange>() { + public Exchange call() throws Exception { + taskCount.increment(); + try { + LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); + processor.process(wireTapExchange); + } catch (Throwable e) { + LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e); + } finally { + taskCount.decrement(); + } + return wireTapExchange; } - return wireTapExchange; - } - }); + }); + } catch (Throwable e) { + // in case the thread pool rejects or cannot submit the task then we need to catch + // so camel error handler can react + exchange.setException(e); + } // continue routing this synchronously callback.done(true); diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java new file mode 100644 index 0000000..0e0f9fa --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ThreadPoolRejectedPolicy; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Wire tap unit test + */ +public class WireTapAbortPolicyTest extends ContextTestSupport { + protected MockEndpoint tap; + protected MockEndpoint result; + protected ExecutorService pool; + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + if (pool != null) { + pool.shutdownNow(); + } + } + + @Test + public void testSend() throws Exception { + // hello must come first, as we have delay on the tapped route + result.expectedMinimumMessageCount(2); + tap.expectedMinimumMessageCount(1); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + try { + template.sendBody("direct:start", "C"); + fail("Task should be rejected"); + } catch (Exception e) { + assertIsInstanceOf(RejectedExecutionException.class, e.getCause()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tap = getMockEndpoint("mock:tap"); + result = getMockEndpoint("mock:result"); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: e1 + // use a custom thread pool for sending tapped messages + ExecutorService pool = new ThreadPoolBuilder(context) + // only allow 1 thread and 1 pending task + .poolSize(1) + .maxPoolSize(1) + .maxQueueSize(1) + // and about tasks + .rejectedPolicy(ThreadPoolRejectedPolicy.Abort) + .build("mypool"); + + from("direct:start").to("log:foo") + // pass in the custom pool to the wireTap DSL + .wireTap("direct:tap").executorService(pool).to("mock:result"); + // END SNIPPET: e1 + + from("direct:tap").delay(1000).to("mock:tap"); + } + }; + } +} \ No newline at end of file