Repository: camel Updated Branches: refs/heads/master 3180c8d80 -> c8b835e6f
CAMEL-10666: added uri param to control whether or not serializable headers should be included Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5dd59162 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5dd59162 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5dd59162 Branch: refs/heads/master Commit: 5dd59162e4ac9335e497934b8d662f598cb779d1 Parents: 3180c8d Author: Davide Cavestro <davide.caves...@gmail.com> Authored: Tue Jan 3 16:19:17 2017 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Jan 9 12:42:15 2017 +0100 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsBinding.java | 2 +- .../camel/component/jms/JmsConfiguration.java | 18 +++ .../apache/camel/component/jms/JmsEndpoint.java | 10 ++ ...ctiveMQPropagateSerializableHeadersTest.java | 117 +++++++++++++++++++ 4 files changed, 146 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5dd59162/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index 8c6c19e..871ef64 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -467,7 +467,7 @@ public class JmsBinding { // special for transferExchange if (endpoint != null && endpoint.isTransferExchange()) { LOG.trace("Option transferExchange=true so we use JmsMessageType: Object"); - Serializable holder = DefaultExchangeHolder.marshal(exchange); + Serializable holder = DefaultExchangeHolder.marshal(exchange, false, endpoint.isAllowSerializedHeaders()); Message answer = session.createObjectMessage(holder); // ensure default delivery mode is used by default answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); http://git-wip-us.apache.org/repos/asf/camel/blob/5dd59162/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index b1b9724..8d297b8 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -333,6 +333,11 @@ public class JmsConfiguration implements Cloneable { + " You must enable this option on both the producer and consumer side, so Camel knows the payloads is an Exchange and not a regular payload.") private boolean transferExchange; @UriParam(label = "advanced", + description = "Controls whether or not to include serialized headers." + + " Applies only when {@code transferExchange} is {@code true}." + + " This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level.") + private boolean allowSerializedHeaders; + @UriParam(label = "advanced", description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side," + " then the caused Exception will be send back in response as a javax.jms.ObjectMessage." + " If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge" @@ -1818,6 +1823,19 @@ public class JmsConfiguration implements Cloneable { this.transferExchange = transferExchange; } + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + /** + * Controls whether or not to include serialized headers. + * Applies only when {@link #isTransferExchange()} is {@code true}. + * This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level. + */ + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + public boolean isTransferException() { return transferException; } http://git-wip-us.apache.org/repos/asf/camel/blob/5dd59162/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index 4b1413e..3151e0e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -1106,6 +1106,16 @@ public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Heade } @ManagedAttribute + public boolean isAllowSerializedHeaders() { + return getConfiguration().isAllowSerializedHeaders(); + } + + @ManagedAttribute + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + getConfiguration().setAllowSerializedHeaders(allowSerializedHeaders); + } + + @ManagedAttribute public boolean isTransferException() { return getConfiguration().isTransferException(); } http://git-wip-us.apache.org/repos/asf/camel/blob/5dd59162/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateSerializableHeadersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateSerializableHeadersTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateSerializableHeadersTest.java new file mode 100644 index 0000000..1136a4d --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateSerializableHeadersTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.issues; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jms.CamelJmsTestHelper; +import org.apache.camel.component.mock.AssertionClause; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.ConnectionFactory; +import java.util.*; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class ActiveMQPropagateSerializableHeadersTest extends CamelTestSupport { + + protected Object expectedBody = "<time>" + new Date() + "</time>"; + protected ActiveMQQueue replyQueue = new ActiveMQQueue("test.reply.queue"); + protected String correlationID = "ABC-123"; + protected String messageType = getClass().getName(); + private Calendar calValue; + private Map<String, Object> mapValue; + + @Before + public void setup () { + calValue = Calendar.getInstance(); + mapValue = new LinkedHashMap<String,Object>(); + mapValue.put("myStringEntry", "stringValue"); + mapValue.put("myCalEntry", Calendar.getInstance()); + mapValue.put("myIntEntry", 123); + } + + @Test + public void testForwardingAMessageAcrossJMSKeepingCustomJMSHeaders() throws Exception { + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + + resultEndpoint.expectedBodiesReceived(expectedBody); + AssertionClause firstMessageExpectations = resultEndpoint.message(0); + firstMessageExpectations.header("myCal").isEqualTo(calValue); + firstMessageExpectations.header("myMap").isEqualTo(mapValue); + + template.sendBody("activemq:test.a", expectedBody); + + resultEndpoint.assertIsSatisfied(); + + List<Exchange> list = resultEndpoint.getReceivedExchanges(); + Exchange exchange = list.get(0); + { + String headerValue = exchange.getIn().getHeader("myString", String.class); + assertEquals("myString", "stringValue", headerValue); + } + { + Calendar headerValue = exchange.getIn().getHeader("myCal", Calendar.class); + assertEquals("myCal", calValue, headerValue); + } + { + Map<String,Object> headerValue = exchange.getIn().getHeader("myMap", Map.class); + assertEquals("myMap", mapValue, headerValue); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + // START SNIPPET: example + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + // END SNIPPET: example + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("activemq:test.a").process(new Processor() { + public void process(Exchange exchange) throws Exception { + // set the JMS headers + Message in = exchange.getIn(); + in.setHeader("myString", "stringValue"); + in.setHeader("myMap", mapValue); + in.setHeader("myCal", calValue); + } + }).to("activemq:test.b?transferExchange=true&allowSerializedHeaders=true"); + + from("activemq:test.b").to("mock:result"); + } + }; + } +}