Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 0dcd18e79 -> a16cd6a71
  refs/heads/master 435e5bfaf -> bf2fc7a0c


CAMEL-9199 RabbitMQ Consumer threads crash when sending partially serializable 
objects


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf2fc7a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf2fc7a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf2fc7a0

Branch: refs/heads/master
Commit: bf2fc7a0c612f5336c7926e07bc75a0e0bf0726a
Parents: 435e5bf
Author: Brad Reitmeyer <git...@bradreitmeyer.com>
Authored: Wed Oct 7 16:15:45 2015 -0500
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Oct 8 07:31:39 2015 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 13 ++++-
 .../component/rabbitmq/RabbitMQEndpoint.java    |  8 ++-
 .../rabbitmq/RabbitMQInOutIntTest.java          | 26 +++++++++-
 .../testbeans/TestNonSerializableObject.java    | 39 +++++++++++++++
 .../TestPartiallySerializableObject.java        | 52 ++++++++++++++++++++
 5 files changed, 132 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a6027ea..4343554 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -33,6 +33,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultConsumer;
 
 
@@ -216,7 +217,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
             if (!exchange.isFailed()) {
                 // processing success
                 if (sendReply && exchange.getPattern().isOutCapable()) {
-                    endpoint.publishExchangeToChannel(exchange, channel, 
properties.getReplyTo());
+                    try {
+                        endpoint.publishExchangeToChannel(exchange, channel, 
properties.getReplyTo());
+                    } catch (RuntimeCamelException e) {
+                        getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
+                    }
                 }
                 if (!consumer.endpoint.isAutoAck()) {
                     log.trace("Acknowledging receipt [delivery_tag={}]", 
deliveryTag);
@@ -226,7 +231,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 // the inOut exchange failed so put the exception in the body 
and send back
                 msg.setBody(exchange.getException());
                 exchange.setOut(msg);
-                endpoint.publishExchangeToChannel(exchange, channel, 
properties.getReplyTo());
+                try {
+                    endpoint.publishExchangeToChannel(exchange, channel, 
properties.getReplyTo());
+                } catch (RuntimeCamelException e) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+                }
             } else {
                 boolean isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception

http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 271ac6c..50aebd2 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.NotSerializableException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
@@ -49,6 +50,7 @@ import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultMessage;
@@ -284,13 +286,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
                 try (ByteArrayOutputStream b = new ByteArrayOutputStream(); 
ObjectOutputStream o = new ObjectOutputStream(b);) {
                     o.writeObject(msg.getBody());
                     body = b.toByteArray();
+                } catch (NotSerializableException nse) {
+                    LOG.warn("Can not send object " + msg.getBody().getClass() 
+ " via RabbitMQ because it contains non-serializable objects.");
+                    throw new RuntimeCamelException(e);
                 }
             } else if (msg.getBody() == null) {
                 properties = 
getMessageConverter().buildProperties(camelExchange).build();
                 body = null;
             } else {
                 LOG.warn("Could not convert {} to byte[]", msg.getBody());
-                throw new IOException(e);
+                throw new RuntimeCamelException(e);
             }
         }
         String rabbitExchange = getExchangeName(msg);
@@ -299,7 +304,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         Boolean immediate = 
camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), 
Boolean.class);
 
         LOG.debug("Sending message to exchange: {} with CorrelationId = {}", 
rabbitExchange, properties.getCorrelationId());
-
         channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, 
properties, body);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
index 7d09545..51b957f 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -35,12 +35,14 @@ import org.apache.camel.Processor;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.rabbitmq.testbeans.TestNonSerializableObject;
+import 
org.apache.camel.component.rabbitmq.testbeans.TestPartiallySerializableObject;
 import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 public class RabbitMQInOutIntTest extends CamelTestSupport {
-    
+
     public static final String ROUTING_KEY = "rk5";
     public static final long TIMEOUT_MS = 2000;
     private static final String EXCHANGE = "ex5";
@@ -51,7 +53,8 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
     @Produce(uri = "direct:rabbitMQ")
     protected ProducerTemplate directProducer;
 
-    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + 
"?exchangeType=direct&username=cameltest&password=cameltest" + 
"&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + 
"?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest"
+                    + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY
                     + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
     private Endpoint rabbitMQEndpoint;
 
@@ -69,6 +72,10 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
                         if 
(exchange.getIn().getBody(TestSerializableObject.class) != null) {
                             TestSerializableObject foo = 
exchange.getIn().getBody(TestSerializableObject.class);
                             foo.setDescription("foobar");
+                        } else if 
(exchange.getIn().getBody(TestPartiallySerializableObject.class) != null) {
+                            TestPartiallySerializableObject foo = 
exchange.getIn().getBody(TestPartiallySerializableObject.class);
+                            foo.setNonSerializableObject(new 
TestNonSerializableObject());
+                            foo.setDescription("foobar");
                         } else if (exchange.getIn().getBody(String.class) != 
null) {
                             if 
(exchange.getIn().getBody(String.class).contains("header")) {
                                 
assertEquals(exchange.getIn().getHeader("String"), "String");
@@ -149,6 +156,21 @@ public class RabbitMQInOutIntTest extends CamelTestSupport 
{
     }
 
     @Test
+    public void partiallySerializeTest() throws InterruptedException, 
IOException {
+        TestPartiallySerializableObject foo = new 
TestPartiallySerializableObject();
+        foo.setName("foobar");
+
+        try {
+            TestPartiallySerializableObject reply = 
template.requestBodyAndHeader("direct:rabbitMQ", foo, 
RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, 
TestPartiallySerializableObject.class);
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+        // Make sure we didn't crash the one Consumer thread
+        String reply2 = template.requestBodyAndHeader("direct:rabbitMQ", 
"partiallySerializeTest1", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, 
String.class);
+        assertEquals("partiallySerializeTest1 response", reply2);
+    }
+
+    @Test
     public void testSerializableObject() throws IOException {
         TestSerializableObject foo = new TestSerializableObject();
         foo.setName("foobar");

http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
new file mode 100644
index 0000000..92695ac
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestNonSerializableObject.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.testbeans;
+
+public class TestNonSerializableObject {
+
+    private String description;
+    private String name;
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/bf2fc7a0/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
new file mode 100644
index 0000000..44decc8
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestPartiallySerializableObject.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.testbeans;
+
+import java.io.Serializable;
+
+public class TestPartiallySerializableObject implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String description;
+    private String name;
+    private TestNonSerializableObject nonSerializableObject;
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public TestNonSerializableObject getNonSerializableObject() {
+        return nonSerializableObject;
+    }
+
+    public void setNonSerializableObject(TestNonSerializableObject 
nonSerializableObject) {
+        this.nonSerializableObject = nonSerializableObject;
+    }
+
+}
\ No newline at end of file

Reply via email to