CAMEL-7736: Fixed issue with exchange may get stuck as inflight if creating producer fails when its used in dynamic eips such as routing slip etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f341506e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f341506e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f341506e Branch: refs/heads/camel-2.12.x Commit: f341506ea7a88dbe2f27636b8761ee0c3a597c5d Parents: 42e26e1 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 22 13:37:50 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 22 13:39:38 2014 +0200 ---------------------------------------------------------------------- .../org/apache/camel/impl/ProducerCache.java | 37 ++++++++----- .../RoutingSlipCreateProducerFailedTest.java | 57 ++++++++++++++++++++ 2 files changed, 81 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f341506e/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index 7b713c8..35add69 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -267,20 +267,31 @@ public class ProducerCache extends ServiceSupport { */ public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, final AsyncCallback callback, final AsyncProducerCallback producerCallback) { - boolean sync = true; - // get the producer and we do not mind if its pooled as we can handle returning it back to the pool - final Producer producer = doGetProducer(endpoint, true); - - if (producer == null) { - if (isStopped()) { - LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); - return false; - } else { - throw new IllegalStateException("No producer, this processor has not been started: " + this); + Producer target; + try { + // get the producer and we do not mind if its pooled as we can handle returning it back to the pool + target = doGetProducer(endpoint, true); + + if (target == null) { + if (isStopped()) { + LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); + callback.done(true); + return true; + } else { + exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this)); + callback.done(true); + return true; + } } + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; } + final Producer producer = target; + // record timing for sending the exchange using the producer final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null; @@ -290,7 +301,7 @@ public class ProducerCache extends ServiceSupport { } // invoke the callback AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); - sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { + return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { @Override public void done(boolean doneSync) { try { @@ -322,9 +333,9 @@ public class ProducerCache extends ServiceSupport { if (exchange != null) { exchange.setException(e); } + callback.done(true); + return true; } - - return sync; } protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, http://git-wip-us.apache.org/repos/asf/camel/blob/f341506e/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipCreateProducerFailedTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipCreateProducerFailedTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipCreateProducerFailedTest.java new file mode 100644 index 0000000..78749fb --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipCreateProducerFailedTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.routingslip; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.builder.RouteBuilder; + +public class RoutingSlipCreateProducerFailedTest extends ContextTestSupport { + + public void testRoutingSlipCreateProducerFailed() throws Exception { + // no inflight + assertEquals(0, context.getInflightRepository().size()); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "log:foo"); + + // no inflight + assertEquals(0, context.getInflightRepository().size()); + + // those 2 options not allowed together + try { + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "file://target/test?fileExist=Append&tempPrefix=hello"); + fail("Should fail"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(FailedToCreateProducerException.class, e.getCause()); + } + + // no inflight + assertEquals(0, context.getInflightRepository().size()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .routingSlip(header("foo")); + } + }; + } +}