Repository: camel Updated Branches: refs/heads/master a6c7373d4 -> d62e0ac7c
Fix code formatting issues in camel-rabbitmq component. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d62e0ac7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d62e0ac7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d62e0ac7 Branch: refs/heads/master Commit: d62e0ac7c7f6b4851d6a756dfc4337981c51441e Parents: a6c7373 Author: Brad Reitmeyer <brrei...@cisco.com> Authored: Tue Jun 9 18:20:46 2015 -0500 Committer: Brad Reitmeyer <git...@bradreitmeyer.com> Committed: Tue Jun 9 18:22:06 2015 -0500 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQComponent.java | 4 +- .../component/rabbitmq/RabbitMQConsumer.java | 6 +- .../component/rabbitmq/RabbitMQEndpoint.java | 5 +- .../rabbitmq/RabbitMQMessageConverter.java | 10 +-- .../component/rabbitmq/RabbitMQProducer.java | 2 +- .../rabbitmq/reply/CorrelationTimeoutMap.java | 21 +++++- .../component/rabbitmq/reply/ReplyHolder.java | 14 ++-- .../component/rabbitmq/reply/ReplyManager.java | 28 ++++++-- .../rabbitmq/reply/ReplyManagerSupport.java | 68 +++++++++++--------- .../reply/TemporaryQueueReplyHandler.java | 12 ++-- .../reply/TemporaryQueueReplyManager.java | 30 ++++----- .../rabbitmq/RabbitMQEndpointTest.java | 20 +++--- .../rabbitmq/RabbitMQInOutIntTest.java | 10 +-- .../testbeans/TestSerializableObject.java | 16 +++++ 14 files changed, 149 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java index c125421..3fe86c5 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java @@ -21,13 +21,13 @@ import java.util.Map; import javax.net.ssl.TrustManager; +import com.rabbitmq.client.ConnectionFactory; + import org.apache.camel.CamelContext; import org.apache.camel.impl.UriEndpointComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.ConnectionFactory; - public class RabbitMQComponent extends UriEndpointComponent { private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class); http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index bf142ce..48ec60f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -216,14 +216,12 @@ public class RabbitMQConsumer extends DefaultConsumer { log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag); channel.basicAck(deliveryTag, false); } - } - else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) { + } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) { // the inOut exchange failed so put the exception in the body and send back msg.setBody(exchange.getException()); exchange.setOut(msg); endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); - } - else { + } else { boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 6cd0bca..979efb3 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -224,7 +224,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint { if (hasSerializeHeader(properties)) { Object messageBody = null; - try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) { + try (InputStream b = new ByteArrayInputStream(body); + ObjectInputStream o = new ObjectInputStream(b);) { messageBody = o.readObject(); } catch (IOException | ClassNotFoundException e) { LOG.warn("Could not deserialize the object"); @@ -287,7 +288,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { properties = getMessageConverter().buildProperties(camelExchange).build(); - try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { + try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) { o.writeObject(msg.getBody()); body = b.toByteArray(); } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java index 95abe81..4d8f35c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java @@ -22,15 +22,15 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.LongString; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.LongString; - public class RabbitMQMessageConverter { protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class); @@ -167,7 +167,9 @@ public class RabbitMQMessageConverter { LOG.debug("Ignoring header: {} with null value", header.getKey()); } else { LOG.debug("Ignoring header: {} of class: {} with value: {}", - new Object[] { header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() }); + new Object[] { + header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() + }); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index b8c8ba2..a96d6fd 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -40,6 +40,7 @@ import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; public class RabbitMQProducer extends DefaultAsyncProducer { + private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-"; private Connection conn; private ObjectPool<Channel> channelPool; @@ -47,7 +48,6 @@ public class RabbitMQProducer extends DefaultAsyncProducer { private int closeTimeout = 30 * 1000; private final AtomicBoolean started = new AtomicBoolean(false); - private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-"; private ReplyManager replyManager; public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java index fad4fc0..8f052a0 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.rabbitmq.reply; import java.util.concurrent.ScheduledExecutorService; @@ -76,8 +92,8 @@ public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandle @Override public ReplyHandler putIfAbsent(String key, ReplyHandler value, long timeoutMillis) { - log.info("in putIfAbsent with key {}", key); - + log.info("in putIfAbsent with key {}", key); + try { if (listener != null) { listener.onPut(key); @@ -117,4 +133,3 @@ public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandle } } - http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java index d2890d0..b6fe68d 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java @@ -16,11 +16,11 @@ */ package org.apache.camel.component.rabbitmq.reply; +import com.rabbitmq.client.AMQP; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import com.rabbitmq.client.AMQP; - /** * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback. @@ -41,7 +41,7 @@ public class ReplyHolder { * Constructor to use when a reply message was received */ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, - String correlationId,AMQP.BasicProperties properties, byte[] message) { + String correlationId, AMQP.BasicProperties properties, byte[] message) { this.exchange = exchange; this.callback = callback; this.originalCorrelationId = originalCorrelationId; @@ -54,7 +54,7 @@ public class ReplyHolder { * Constructor to use when a timeout occurred */ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, - String correlationId, long timeout) { + String correlationId, long timeout) { this(exchange, callback, originalCorrelationId, correlationId, null, null); this.timeout = timeout; } @@ -112,12 +112,12 @@ public class ReplyHolder { public long getRequestTimeout() { return timeout; } - + /** * The message properties * @return */ - public AMQP.BasicProperties getProperties(){ - return properties; + public AMQP.BasicProperties getProperties() { + return properties; } } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java index 7c1c015..f6eb64a 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.rabbitmq.reply; import java.util.concurrent.ScheduledExecutorService; @@ -15,7 +31,7 @@ import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; */ public interface ReplyManager { - /** + /** * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}. */ void setEndpoint(RabbitMQEndpoint endpoint); @@ -26,13 +42,13 @@ public interface ReplyManager { * The queue is either a temporary or a persistent queue. */ void setReplyTo(String replyTo); - + /** * Gets the reply to queue being used */ String getReplyTo(); - - /** + + /** * Register a reply * * @param replyManager the reply manager being used @@ -45,13 +61,12 @@ public interface ReplyManager { */ String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout); - + /** * Sets the scheduled to use when checking for timeouts (no reply received within a given time period) */ void setScheduledExecutorService(ScheduledExecutorService executorService); - /** * Updates the correlation id to the new correlation id. * <p/> @@ -65,7 +80,6 @@ public interface ReplyManager { * @param requestTimeout the timeout */ void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout); - /** * Process the reply http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java index 6e41f99..f4f4711 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -1,9 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.rabbitmq.reply; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; + import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -17,24 +36,21 @@ import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; - - -public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager{ - - protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class); +public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager { + protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class); protected final CamelContext camelContext; - + protected final CountDownLatch replyToLatch = new CountDownLatch(1); + protected final long replyToTimeout = 1000; + protected ScheduledExecutorService executorService; protected RabbitMQEndpoint endpoint; protected String replyTo; protected Connection listenerContainer; - private int closeTimeout = 30 * 1000; - protected final CountDownLatch replyToLatch = new CountDownLatch(1); - protected final long replyToTimeout = 1000; + protected CorrelationTimeoutMap correlation; + private int closeTimeout = 30 * 1000; + public ReplyManagerSupport(CamelContext camelContext) { this.camelContext = camelContext; } @@ -74,10 +90,9 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } return replyTo; } - + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { - log.debug("in registerReply"); // add to correlation map QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout); @@ -89,8 +104,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } return correlationId; } - - + protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout); @@ -109,7 +123,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } public void processReply(ReplyHolder holder) { - log.info("in processReply"); + log.info("in processReply"); if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); @@ -127,19 +141,17 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { - - endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage()); - - // restore correlation id in case the remote server messed with it + + endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage()); + + // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { - if(exchange.getOut() != null){ + if (exchange.getOut() != null) { exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); - } - else{ + } else { exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); - } + } } - } } finally { // notify callback @@ -148,8 +160,6 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl } } } - - protected abstract void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message); @@ -216,7 +226,6 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl log.debug("Using executor {}", executorService); } - @Override protected void doStop() throws Exception { @@ -227,12 +236,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl listenerContainer.close(closeTimeout); listenerContainer = null; } - + // must also stop executor service if (executorService != null) { camelContext.getExecutorServiceManager().shutdownGraceful(executorService); executorService = null; } } - } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java index c7fcf41..bb0a102 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.rabbitmq.reply; +import com.rabbitmq.client.AMQP; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.component.rabbitmq.RabbitMQConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.AMQP; /** * {@link ReplyHandler} to handle processing replies when using temporary queues. @@ -31,8 +31,8 @@ import com.rabbitmq.client.AMQP; */ public class TemporaryQueueReplyHandler implements ReplyHandler { - protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class); - + protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class); + // task queue to add the holder so we can process the reply protected final ReplyManager replyManager; protected final Exchange exchange; @@ -54,7 +54,7 @@ public class TemporaryQueueReplyHandler implements ReplyHandler { public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) { // create holder object with the the reply - log.info("in onReply with correlationId= {}", correlationId); + log.info("in onReply with correlationId= {}", correlationId); ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply); // process the reply replyManager.processReply(holder); @@ -62,7 +62,7 @@ public class TemporaryQueueReplyHandler implements ReplyHandler { public void onTimeout(String correlationId) { // create holder object without the reply which means a timeout occurred - log.info("in onTimeout with correlationId= {}", correlationId); + log.info("in onTimeout with correlationId= {}", correlationId); ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout); // process timeout replyManager.processReply(holder); http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java index 6cae778..4bd1242 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java @@ -17,11 +17,6 @@ package org.apache.camel.component.rabbitmq.reply; import java.io.IOException; -import java.util.Map.Entry; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.Queue.DeclareOk; @@ -29,6 +24,9 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; /** * A {@link ReplyManager} when using temporary queues. @@ -36,13 +34,13 @@ import com.rabbitmq.client.Envelope; * @version */ public class TemporaryQueueReplyManager extends ReplyManagerSupport { - + private RabbitConsumer consumer; public TemporaryQueueReplyManager(CamelContext camelContext) { super(camelContext); } - + protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout); @@ -85,7 +83,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { // setup the basicQos if (endpoint.isPrefetchEnabled()) { channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), - endpoint.isPrefetchGlobal()); + endpoint.isPrefetchGlobal()); } //Let the server pick a random name for us @@ -99,16 +97,16 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { consumer = new RabbitConsumer(this, channel); consumer.start(); - return conn; + return conn; } - + @Override protected void doStop() throws Exception { super.doStop(); consumer.stop(); } - //TODO combine with class in RabbitMQConsumer + //TODO combine with class in RabbitMQConsumer class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer { private final TemporaryQueueReplyManager consumer; @@ -131,9 +129,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumer.onMessage(properties, body); + consumer.onMessage(properties, body); } - + /** * Bind consumer to channel */ @@ -147,9 +145,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { private void stop() throws IOException { if (channel.isOpen()) { if (tag != null) { - channel.basicCancel(tag); - } - channel.close(); + channel.basicCancel(tag); + } + channel.close(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 19c580f..1fa4d17 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -26,23 +26,23 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; -import org.apache.camel.Exchange; -import org.apache.camel.impl.JndiRegistry; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; -import org.mockito.Mockito; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.impl.LongStringHelper; +import org.apache.camel.Exchange; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.mockito.Mockito; + public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class); - + protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); registry.bind("argsConfigurer", new ArgsConfigurer() { @@ -75,7 +75,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); assertEquals(body, exchange.getIn().getBody()); } - + @Test public void testExchangeNameIsOptional() throws Exception { RabbitMQEndpoint endpoint1 = context.getEndpoint("rabbitmq:localhost/", RabbitMQEndpoint.class); @@ -83,7 +83,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport { RabbitMQEndpoint endpoint2 = context.getEndpoint("rabbitmq:localhost?autoAck=false", RabbitMQEndpoint.class); assertEquals("Get a wrong exchange name", "", endpoint2.getExchangeName()); - + RabbitMQEndpoint endpoint3 = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); assertEquals("Get a wrong exchange name", "exchange", endpoint3.getExchangeName()); } @@ -156,7 +156,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertTrue(endpoint.isSingleton()); } - + @Test public void testArgConfigurer() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?queueArgsConfigurer=#argsConfigurer", RabbitMQEndpoint.class); http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java index 5c1223e..9e121f8 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java @@ -108,7 +108,7 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { @Test public void headerTest() throws InterruptedException, IOException { - Map<String, Object> headers = new HashMap<>(); + Map<String, Object> headers = new HashMap<String, Object>(); TestSerializableObject testObject = new TestSerializableObject(); testObject.setName("header"); @@ -124,8 +124,8 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { headers.put("CamelSerialize", true); // populate a map and an arrayList - Map<Object, Object> tmpMap = new HashMap<>(); - List<String> tmpList = new ArrayList<>(); + Map<Object, Object> tmpMap = new HashMap<Object, Object>(); + List<String> tmpList = new ArrayList<String>(); for (int i = 0; i < 3; i++) { String name = "header" + i; tmpList.add(name); @@ -156,13 +156,13 @@ public class RabbitMQInOutIntTest extends CamelTestSupport { foo.setName("foobar"); byte[] body = null; - try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { + try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) { o.writeObject(foo); body = b.toByteArray(); } TestSerializableObject newFoo = null; - try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) { + try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b);) { newFoo = (TestSerializableObject) o.readObject(); } catch (IOException | ClassNotFoundException e) { } http://git-wip-us.apache.org/repos/asf/camel/blob/d62e0ac7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java index 1fdffd0..0b0e5cc 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.rabbitmq.testbeans; import java.io.Serializable;