This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 70a6470  CAMEL-14951: WireTap - If thread pool reject task then Camel 
error handler should be able to react
70a6470 is described below

commit 70a6470d8eb9e3dcec6e382d77ac406250309e95
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Apr 22 14:22:56 2020 +0200

    CAMEL-14951: WireTap - If thread pool reject task then Camel error handler 
should be able to react
---
 .../apache/camel/processor/WireTapProcessor.java   | 32 ++++---
 .../camel/processor/WireTapAbortPolicyTest.java    | 98 ++++++++++++++++++++++
 2 files changed, 117 insertions(+), 13 deletions(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 63f7ae4..c269f63 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -152,20 +152,26 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         final Exchange wireTapExchange = target;
 
         // send the exchange to the destination using an executor service
-        executorService.submit(new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                taskCount.increment();
-                try {
-                    LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
-                    processor.process(wireTapExchange);
-                } catch (Throwable e) {
-                    LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", 
e);
-                } finally {
-                    taskCount.decrement();
+        try {
+            executorService.submit(new Callable<Exchange>() {
+                public Exchange call() throws Exception {
+                    taskCount.increment();
+                    try {
+                        LOG.debug(">>>> (wiretap) {} {}", uri, 
wireTapExchange);
+                        processor.process(wireTapExchange);
+                    } catch (Throwable e) {
+                        LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", 
e);
+                    } finally {
+                        taskCount.decrement();
+                    }
+                    return wireTapExchange;
                 }
-                return wireTapExchange;
-            }
-        });
+            });
+        } catch (Throwable e) {
+            // in case the thread pool rejects or cannot submit the task then 
we need to catch
+            // so camel error handler can react
+            exchange.setException(e);
+        }
 
         // continue routing this synchronously
         callback.done(true);
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
new file mode 100644
index 0000000..0e0f9fa
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapAbortPolicyTest extends ContextTestSupport {
+    protected MockEndpoint tap;
+    protected MockEndpoint result;
+    protected ExecutorService pool;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (pool != null) {
+            pool.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testSend() throws Exception {
+        // hello must come first, as we have delay on the tapped route
+        result.expectedMinimumMessageCount(2);
+        tap.expectedMinimumMessageCount(1);
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        try {
+            template.sendBody("direct:start", "C");
+            fail("Task should be rejected");
+        } catch (Exception e) {
+            assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        tap = getMockEndpoint("mock:tap");
+        result = getMockEndpoint("mock:result");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // use a custom thread pool for sending tapped messages
+                ExecutorService pool = new ThreadPoolBuilder(context)
+                        // only allow 1 thread and 1 pending task
+                        .poolSize(1)
+                        .maxPoolSize(1)
+                        .maxQueueSize(1)
+                        // and about tasks
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .build("mypool");
+
+                from("direct:start").to("log:foo")
+                        // pass in the custom pool to the wireTap DSL
+                        
.wireTap("direct:tap").executorService(pool).to("mock:result");
+                // END SNIPPET: e1
+
+                from("direct:tap").delay(1000).to("mock:tap");
+            }
+        };
+    }
+}
\ No newline at end of file

Reply via email to