Fixed CS. Polished. This closes #1119
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f39b83ee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f39b83ee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f39b83ee Branch: refs/heads/master Commit: f39b83eebb6133086e01d98c1f3fb3af38f2dd09 Parents: 7ee0977 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 16 16:27:35 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 16 16:27:35 2016 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitConsumer.java | 55 ++++++++++---------- .../component/rabbitmq/RabbitMQConsumer.java | 3 +- 2 files changed, 28 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f39b83ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index 93e2499..6c20b57 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -26,7 +26,6 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; - import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; @@ -42,7 +41,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { /** Consumer tag for this consumer. */ private volatile String consumerTag; private volatile boolean stopping; - + private final Semaphore lock = new Semaphore(1); /** @@ -59,29 +58,29 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e); } } + @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - try { + try { if (!consumer.getEndpoint().isAutoAck()) { - lock.acquire(); + lock.acquire(); } //Channel might be open because while we were waiting for the lock, stop() has been succesfully called. - if (!channel.isOpen()) return; - + if (!channel.isOpen()) { + return; + } + try { doHandleDelivery(consumerTag, envelope, properties, body); } finally { if (!consumer.getEndpoint().isAutoAck()) { - lock.release(); + lock.release(); } - } - - } catch (InterruptedException e) { - log.error("Thread Interrupted!"); - - } - - + } + + } catch (InterruptedException e) { + log.warn("Thread Interrupted!"); + } } public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { @@ -186,25 +185,25 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { channel.basicCancel(tag); } try { - lock.acquire(); + lock.acquire(); if (isChannelOpen()) { channel.close(); } - } catch (TimeoutException e) { + } catch (TimeoutException e) { log.error("Timeout occured"); throw e; } catch (InterruptedException e1) { - log.error("Thread Interrupted!"); + log.error("Thread Interrupted!"); } finally { lock.release(); - - } + + } } /** * Stores the most recently passed-in consumerTag - semantically, there * should be only one. - * + * * @see Consumer#handleConsumeOk */ public void handleConsumeOk(String consumerTag) { @@ -213,7 +212,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { /** * Retrieve the consumer tag. - * + * * @return the most recently notified consumer tag. */ public String getConsumerTag() { @@ -222,31 +221,31 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { /** * No-op implementation of {@link Consumer#handleCancelOk}. - * + * * @param consumerTag * the defined consumer tag (client- or server-generated) */ public void handleCancelOk(String consumerTag) { // no work to do - log.debug("Recieved cancelOk signal on the rabbitMQ channel"); + log.debug("Received cancelOk signal on the rabbitMQ channel"); } /** * No-op implementation of {@link Consumer#handleCancel(String)} - * + * * @param consumerTag * the defined consumer tag (client- or server-generated) */ public void handleCancel(String consumerTag) throws IOException { // no work to do - log.debug("Recieved cancel signal on the rabbitMQ channel"); + log.debug("Received cancel signal on the rabbitMQ channel"); } /** * No-op implementation of {@link Consumer#handleShutdownSignal}. */ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { - log.info("Recieved shutdown signal on the rabbitMQ channel"); + log.info("Received shutdown signal on the rabbitMQ channel"); // Check if the consumer closed the connection or something else if (!sig.isInitiatedByApplication()) { @@ -277,7 +276,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer { */ public void handleRecoverOk(String consumerTag) { // no work to do - log.debug("Recieved recover ok signal on the rabbitMQ channel"); + log.debug("Received recover ok signal on the rabbitMQ channel"); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/f39b83ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 9faffc2..9c02cb7 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -131,8 +131,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { try { consumer.stop(); } catch (TimeoutException e) { - log.error("Timeout occured"); - throw e; + log.warn("Timeout occurred while stopping consumer. This exception is ignored", e); } } this.consumers.clear();