Repository: camel Updated Branches: refs/heads/camel-2.16.x 0dcd18e79 -> a16cd6a71 refs/heads/master 435e5bfaf -> bf2fc7a0c
CAMEL-9199 RabbitMQ Consumer threads crash when sending partially serializable objects Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf2fc7a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf2fc7a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf2fc7a0 Branch: refs/heads/master Commit: bf2fc7a0c612f5336c7926e07bc75a0e0bf0726a Parents: 435e5bf Author: Brad Reitmeyer <git...@bradreitmeyer.com> Authored: Wed Oct 7 16:15:45 2015 -0500 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Oct 8 07:31:39 2015 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 13 ++++- .../component/rabbitmq/RabbitMQEndpoint.java | 8 ++- .../rabbitmq/RabbitMQInOutIntTest.java | 26 +++++++++- .../testbeans/TestNonSerializableObject.java | 39 +++++++++++++++ .../TestPartiallySerializableObject.java | 52 ++++++++++++++++++++ 5 files changed, 132 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index a6027ea..4343554 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -33,6 +33,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultConsumer; @@ -216,7 +217,11 @@ public class RabbitMQConsumer extends DefaultConsumer { if (!exchange.isFailed()) { // processing success if (sendReply && exchange.getPattern().isOutCapable()) { - endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + try { + endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (RuntimeCamelException e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } } if (!consumer.endpoint.isAutoAck()) { log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag); @@ -226,7 +231,11 @@ public class RabbitMQConsumer extends DefaultConsumer { // the inOut exchange failed so put the exception in the body and send back msg.setBody(exchange.getException()); exchange.setOut(msg); - endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + try { + endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (RuntimeCamelException e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } } else { boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 271ac6c..50aebd2 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.NotSerializableException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; @@ -49,6 +50,7 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConversionException; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultMessage; @@ -284,13 +286,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint { try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) { o.writeObject(msg.getBody()); body = b.toByteArray(); + } catch (NotSerializableException nse) { + LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects."); + throw new RuntimeCamelException(e); } } else if (msg.getBody() == null) { properties = getMessageConverter().buildProperties(camelExchange).build(); body = null; } else { LOG.warn("Could not convert {} to byte[]", msg.getBody()); - throw new IOException(e); + throw new RuntimeCamelException(e); } } String rabbitExchange = getExchangeName(msg); @@ -299,7 +304,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint { Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class); LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); - channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); } http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java index 7d09545..51b957f 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java @@ -35,12 +35,14 @@ import org.apache.camel.Processor; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.rabbitmq.testbeans.TestNonSerializableObject; +import org.apache.camel.component.rabbitmq.testbeans.TestPartiallySerializableObject; import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; public class RabbitMQInOutIntTest extends CamelTestSupport { - + public static final String ROUTING_KEY = "rk5"; public static final long TIMEOUT_MS = 2000; private static final String EXCHANGE = "ex5"; @@ -51,7 +53,8 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { @Produce(uri = "direct:rabbitMQ") protected ProducerTemplate directProducer; - @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?exchangeType=direct&username=cameltest&password=cameltest" + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest" + + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY + "&transferException=true&requestTimeout=" + TIMEOUT_MS) private Endpoint rabbitMQEndpoint; @@ -69,6 +72,10 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { if (exchange.getIn().getBody(TestSerializableObject.class) != null) { TestSerializableObject foo = exchange.getIn().getBody(TestSerializableObject.class); foo.setDescription("foobar"); + } else if (exchange.getIn().getBody(TestPartiallySerializableObject.class) != null) { + TestPartiallySerializableObject foo = exchange.getIn().getBody(TestPartiallySerializableObject.class); + foo.setNonSerializableObject(new TestNonSerializableObject()); + foo.setDescription("foobar"); } else if (exchange.getIn().getBody(String.class) != null) { if (exchange.getIn().getBody(String.class).contains("header")) { assertEquals(exchange.getIn().getHeader("String"), "String"); @@ -149,6 +156,21 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { } @Test + public void partiallySerializeTest() throws InterruptedException, IOException { + TestPartiallySerializableObject foo = new TestPartiallySerializableObject(); + foo.setName("foobar"); + + try { + TestPartiallySerializableObject reply = template.requestBodyAndHeader("direct:rabbitMQ", foo, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, TestPartiallySerializableObject.class); + } catch (CamelExecutionException e) { + // expected + } + // Make sure we didn't crash the one Consumer thread + String reply2 = template.requestBodyAndHeader("direct:rabbitMQ", "partiallySerializeTest1", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class); + assertEquals("partiallySerializeTest1 response", reply2); + } + + @Test public void testSerializableObject() throws IOException { TestSerializableObject foo = new TestSerializableObject(); foo.setName("foobar"); http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java new file mode 100644 index 0000000..92695ac --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.testbeans; + +public class TestNonSerializableObject { + + private String description; + private String name; + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java new file mode 100644 index 0000000..44decc8 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.testbeans; + +import java.io.Serializable; + +public class TestPartiallySerializableObject implements Serializable { + private static final long serialVersionUID = 1L; + + private String description; + private String name; + private TestNonSerializableObject nonSerializableObject; + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public TestNonSerializableObject getNonSerializableObject() { + return nonSerializableObject; + } + + public void setNonSerializableObject(TestNonSerializableObject nonSerializableObject) { + this.nonSerializableObject = nonSerializableObject; + } + +} \ No newline at end of file