This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 39894f61cda7a3b14ea2f653ecf6b17c9614f821 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jul 29 10:16:04 2022 +0200 CAMEL-18324: camel-core - Exception during preparing exchange task can block thread --- .../java/org/apache/camel/processor/Pipeline.java | 20 +++--- .../apache/camel/processor/PooledTaskFactory.java | 10 ++- .../errorhandler/RedeliveryErrorHandler.java | 22 ++++--- .../PopulateInitialHeadersFailedIssueTest.java | 73 ++++++++++++++++++++++ 4 files changed, 109 insertions(+), 16 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java index 29d7596ef5c..0058cdf02e3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java @@ -175,15 +175,21 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { - // create task which has state used during routing - PooledExchangeTask task = taskFactory.acquire(exchange, callback); + try { + // create task which has state used during routing + PooledExchangeTask task = taskFactory.acquire(exchange, callback); - if (exchange.isTransacted()) { - reactiveExecutor.scheduleQueue(task); - } else { - reactiveExecutor.scheduleMain(task); + if (exchange.isTransacted()) { + reactiveExecutor.scheduleQueue(task); + } else { + reactiveExecutor.scheduleMain(task); + } + return false; + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; } - return false; } @Override diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java index ac184ab2092..18d26707257 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java @@ -47,7 +47,15 @@ public abstract class PooledTaskFactory extends PooledObjectFactorySupport<Poole statistics.acquired.increment(); } } - task.prepare(exchange, callback); + try { + task.prepare(exchange, callback); + } catch (Throwable e) { + // if error during prepare then we need to discard this task + if (statisticsEnabled) { + statistics.discarded.increment(); + } + throw e; + } return task; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 5e017715b43..65ab7d57ad3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -206,16 +206,22 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport */ @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not) - Runnable task = taskFactory.acquire(exchange, callback); + try { + // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not) + Runnable task = taskFactory.acquire(exchange, callback); - // Run it - if (exchange.isTransacted()) { - reactiveExecutor.scheduleQueue(task); - } else { - reactiveExecutor.scheduleMain(task); + // Run it + if (exchange.isTransacted()) { + reactiveExecutor.scheduleQueue(task); + } else { + reactiveExecutor.scheduleMain(task); + } + return false; + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; } - return false; } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java new file mode 100644 index 00000000000..dbcea545606 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.DefaultMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class PopulateInitialHeadersFailedIssueTest extends ContextTestSupport { + + @Test + public void testPopulateInitialHeadersFailed() throws Exception { + Exchange exchange = new DefaultExchange(context.getEndpoint("seda:start")); + exchange.setPattern(ExchangePattern.InOut); + MyFaultMessage msg = new MyFaultMessage(exchange); + exchange.setMessage(msg); + msg.setBody("Hello World"); + + getMockEndpoint("mock:result").expectedMessageCount(0); + template.send("seda:start", exchange); + assertMockEndpointsSatisfied(); + + IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, exchange.getException()); + Assertions.assertEquals("Forced headers error", iae.getMessage()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // enable redelivery which forces copy defensive headers + errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0)); + + from("seda:start") + .to("mock:result"); + } + }; + } + + private class MyFaultMessage extends DefaultMessage { + + public MyFaultMessage(Exchange exchange) { + super(exchange); + } + + @Override + protected void populateInitialHeaders(Map<String, Object> map) { + throw new IllegalArgumentException("Forced headers error"); + } + } +}