Repository: camel Updated Branches: refs/heads/master b914c04cc -> e606fb8ea
CAMEL-9352: camel-jms - Add transferFault option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e606fb8e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e606fb8e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e606fb8e Branch: refs/heads/master Commit: e606fb8ea0bfa56a782a3c1b36d2ff6aa3276f29 Parents: b914c04 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Nov 23 14:39:47 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Nov 23 14:39:47 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsBinding.java | 15 +++- .../camel/component/jms/JmsComponent.java | 12 +++ .../camel/component/jms/JmsConfiguration.java | 18 ++++ .../camel/component/jms/JmsConstants.java | 1 + .../apache/camel/component/jms/JmsEndpoint.java | 10 +++ .../jms/reply/ReplyManagerSupport.java | 12 +++ .../component/jms/JmsTransferFaultTest.java | 93 ++++++++++++++++++++ 7 files changed, 159 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index f8b4eef..8c6c19e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -300,7 +300,7 @@ public class JmsBinding { } else { ObjectHelper.notNull(camelMessage, "message"); // create regular jms message using the camel message body - answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext()); + answer = createJmsMessage(exchange, camelMessage, session, exchange.getContext()); appendJmsProperties(answer, exchange, camelMessage); } } @@ -450,8 +450,19 @@ public class JmsBinding { return answer; } + protected Message createJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session, CamelContext context) throws JMSException { + Message answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, context); + + // special for transferFault + boolean isFault = camelMessage.isFault(); + if (answer != null && isFault && endpoint != null && endpoint.isTransferFault()) { + answer.setBooleanProperty(JmsConstants.JMS_TRANSFER_FAULT, true); + } + return answer; + } + protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException { - JmsMessageType type = null; + JmsMessageType type; // special for transferExchange if (endpoint != null && endpoint.isTransferExchange()) { http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index e13eecb..8d5982c 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -607,6 +607,18 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon } /** + * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side, + * then the fault flag on {@link org.apache.camel.Message#isFault()} will be send back in the response as a JMS header with the key + * {@link JmsConstants#JMS_TRANSFER_FAULT}. + * If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}. + * <p/> + * You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws. + */ + public void setTransferFault(boolean transferFault) { + getConfiguration().setTransferFault(transferFault); + } + + /** * Allows you to use your own implementation of the org.springframework.jms.core.JmsOperations interface. * Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs. */ http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index aea53f1..342029e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -181,6 +181,8 @@ public class JmsConfiguration implements Cloneable { private boolean transferExchange; @UriParam(label = "advanced") private boolean transferException; + @UriParam(label = "advanced") + private boolean transferFault; @UriParam private boolean testConnectionOnStartup; @UriParam(label = "advanced") @@ -1536,6 +1538,22 @@ public class JmsConfiguration implements Cloneable { this.transferException = transferException; } + public boolean isTransferFault() { + return transferFault; + } + + /** + * If enabled and you are using Request Reply messaging (InOut) and an Exchange failed with a SOAP fault (not exception) on the consumer side, + * then the fault flag on {@link org.apache.camel.Message#isFault()} will be send back in the response as a JMS header with the key + * {@link JmsConstants#JMS_TRANSFER_FAULT}. + * If the client is Camel, the returned fault flag will be set on the {@link org.apache.camel.Message#setFault(boolean)}. + * <p/> + * You may want to enable this when using Camel components that support faults such as SOAP based such as cxf or spring-ws. + */ + public void setTransferFault(boolean transferFault) { + this.transferFault = transferFault; + } + public boolean isAsyncStartListener() { return asyncStartListener; } http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java index 3b20026..d7fc984 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConstants.java @@ -27,6 +27,7 @@ public final class JmsConstants { public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType"; public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode"; public static final String JMS_REQUEST_TIMEOUT = "CamelJmsRequestTimeout"; + public static final String JMS_TRANSFER_FAULT = "CamelJmsTransferFault"; private JmsConstants() { // utility class http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index ea68d14..36f982a 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -1112,6 +1112,16 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public void setTransferFault(boolean transferFault) { + getConfiguration().setTransferFault(transferFault); + } + + @ManagedAttribute + public boolean isTransferFault() { + return getConfiguration().isTransferFault(); + } + + @ManagedAttribute public boolean isTestConnectionOnStartup() { return configuration.isTestConnectionOnStartup(); } http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index a2f70c3..759ed3c 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -29,6 +29,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.component.jms.JmsConstants; import org.apache.camel.component.jms.JmsEndpoint; import org.apache.camel.component.jms.JmsMessage; import org.apache.camel.component.jms.JmsMessageHelper; @@ -168,6 +169,17 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } else { log.debug("Reply received. OUT message body set to reply payload: {}", body); } + if (endpoint.isTransferFault()) { + // remove the header as we do not want to keep it on the Camel Message either + Object faultHeader = response.removeHeader(JmsConstants.JMS_TRANSFER_FAULT); + if (faultHeader != null) { + boolean isFault = exchange.getContext().getTypeConverter().tryConvertTo(boolean.class, faultHeader); + log.debug("Transfer fault on OUT message: {}", isFault); + if (isFault) { + exchange.getOut().setFault(true); + } + } + } // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/e606fb8e/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferFaultTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferFaultTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferFaultTest.java new file mode 100644 index 0000000..7dd95f4 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferFaultTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class JmsTransferFaultTest extends CamelTestSupport { + + protected String getUri() { + return "activemq:queue:foo?transferFault=true"; + } + + @Test + public void testNoFault() throws Exception { + Exchange out = template.request(getUri(), new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + }); + assertNotNull(out); + assertEquals("Bye World", out.getOut().getBody()); + assertFalse("Should not be a fault", out.getOut().isFault()); + } + + @Test + public void testTransferFault() throws Exception { + Exchange out = template.request(getUri(), new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Kaboom"); + } + }); + assertNotNull(out); + assertEquals("Bye World", out.getOut().getBody()); + assertTrue("Should be a fault", out.getOut().isFault()); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(getUri()) + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + exchange.getOut().setBody("Bye World"); + if (body.equals("Kaboom")) { + exchange.getOut().setFault(true); + } + } + }); + } + }; + } + +} \ No newline at end of file