This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 95366aee7f56564bb8170b4c4a40429c1fba5e09
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Apr 22 13:48:18 2020 +0200

    CAMEL-14951: WireTap - If thread pool reject task then Camel error handler 
should be able to react
---
 .../apache/camel/processor/WireTapProcessor.java   | 27 +++---
 .../camel/processor/WireTapAbortPolicyTest.java    | 99 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 10 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 00588e7..a63781b 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.camel.AsyncCallback;
@@ -168,17 +169,23 @@ public class WireTapProcessor extends 
AsyncProcessorSupport implements Traceable
         final Exchange wireTapExchange = target;
 
         // send the exchange to the destination using an executor service
-        executorService.submit(() -> {
-            taskCount.increment();
-            LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
-            asyncProcessor.process(wireTapExchange, doneSync -> {
-                if (wireTapExchange.getException() != null) {
-                    String u = URISupport.sanitizeUri(uri);
-                    LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + u + ". This exception will be ignored.", 
wireTapExchange.getException());
-                }
-                taskCount.decrement();
+        try {
+            executorService.submit(() -> {
+                taskCount.increment();
+                LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
+                asyncProcessor.process(wireTapExchange, doneSync -> {
+                    if (wireTapExchange.getException() != null) {
+                        String u = URISupport.sanitizeUri(uri);
+                        LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + u + ". This exception will be ignored.", 
wireTapExchange.getException());
+                    }
+                    taskCount.decrement();
+                });
             });
-        });
+        } catch (Throwable e) {
+            // in case the thread pool rejects or cannot submit the task then 
we need to catch
+            // so camel error handler can react
+            exchange.setException(e);
+        }
 
         // continue routing this synchronously
         callback.done(true);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
new file mode 100644
index 0000000..80cbd90
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapAbortPolicyTest extends ContextTestSupport {
+    protected MockEndpoint tap;
+    protected MockEndpoint result;
+    protected ExecutorService pool;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (pool != null) {
+            pool.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testSend() throws Exception {
+        // hello must come first, as we have delay on the tapped route
+        result.expectedMinimumMessageCount(2);
+        tap.expectedMinimumMessageCount(1);
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        try {
+            template.sendBody("direct:start", "C");
+            fail("Task should be rejected");
+        } catch (Exception e) {
+            assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        tap = getMockEndpoint("mock:tap");
+        result = getMockEndpoint("mock:result");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // use a custom thread pool for sending tapped messages
+                ExecutorService pool = new ThreadPoolBuilder(context)
+                        // only allow 1 thread and 1 pending task
+                        .poolSize(1)
+                        .maxPoolSize(1)
+                        .maxQueueSize(1)
+                        // and about tasks
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .build();
+
+                from("direct:start").to("log:foo")
+                    // pass in the custom pool to the wireTap DSL
+                    
.wireTap("direct:tap").executorService(pool).to("mock:result");
+                // END SNIPPET: e1
+
+                from("direct:tap").delay(1000).to("mock:tap");
+            }
+        };
+    }
+}

Reply via email to