Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 18859fb05 -> bd10c49bd
  refs/heads/master 06db3cd24 -> 47c64ec9c


CAMEL-9055: camel-aws - SQS should not allow handover the delete task


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/47c64ec9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/47c64ec9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/47c64ec9

Branch: refs/heads/master
Commit: 47c64ec9c6b9609a71113ad82b15d2c66463c4cd
Parents: 06db3cd
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Aug 5 15:33:43 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 5 15:36:39 2015 +0200

----------------------------------------------------------------------
 .../camel/component/aws/sqs/SqsConsumer.java    | 40 ++++++++++++--------
 .../camel/component/aws/sqs/SqsProducer.java    |  2 +-
 2 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/47c64ec9/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 0a8d024..d3e0a25 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -34,13 +34,13 @@ import 
com.amazonaws.services.sqs.model.QueueDoesNotExistException;
 import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
@@ -141,17 +141,20 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
 
             // schedule task to extend visibility if enabled
             Integer visibilityTimeout = 
getConfiguration().getVisibilityTimeout();
-            if (this.scheduledExecutor != null && visibilityTimeout != null && 
(visibilityTimeout.intValue() / 2) > 0) {
-                int delay = visibilityTimeout.intValue() / 2;
-                int period = visibilityTimeout.intValue();
+            if (this.scheduledExecutor != null && visibilityTimeout != null && 
(visibilityTimeout / 2) > 0) {
+                int delay = visibilityTimeout / 2;
+                int period = visibilityTimeout;
                 int repeatSeconds = new Double(visibilityTimeout.doubleValue() 
* 1.5).intValue();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Scheduled TimeoutExtender task to start after 
{} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
                             new Object[]{delay, period, repeatSeconds, 
exchange.getExchangeId()});
                 }
+
                 final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(
                         new TimeoutExtender(exchange, repeatSeconds), delay, 
period, TimeUnit.SECONDS);
-                exchange.addOnCompletion(new Synchronization() {
+
+                // as the AWS client is not thread-safe we cannot handover the 
task
+                exchange.addOnCompletion(new SynchronizationAdapter() {
                     @Override
                     public void onComplete(Exchange exchange) {
                         cancelExtender(exchange);
@@ -162,6 +165,11 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
                         cancelExtender(exchange);
                     }
 
+                    @Override
+                    public boolean allowHandover() {
+                        return false;
+                    }
+
                     private void cancelExtender(Exchange exchange) {
                         // cancel task as we are done
                         LOG.trace("Processing done so cancelling 
TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
@@ -170,24 +178,30 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
                 });
             }
 
-
             // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new Synchronization() {
+            // as the AWS client is not thread-safe we cannot handover the task
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
                 public void onComplete(Exchange exchange) {
                     processCommit(exchange);
                 }
 
+                @Override
                 public void onFailure(Exchange exchange) {
                     processRollback(exchange);
                 }
 
                 @Override
+                public boolean allowHandover() {
+                    return false;
+                }
+
+                @Override
                 public String toString() {
                     return "SqsConsumerOnCompletion";
                 }
             });
 
-
             LOG.trace("Processing exchange [{}]...", exchange);
             getAsyncProcessor().process(exchange, new AsyncCallback() {
                 @Override
@@ -207,7 +221,6 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
      */
     protected void processCommit(Exchange exchange) {
         try {
-
             if (shouldDelete(exchange)) {
                 String receiptHandle = 
exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
                 DeleteMessageRequest deleteRequest = new 
DeleteMessageRequest(getQueueUrl(), receiptHandle);
@@ -224,10 +237,7 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
     }
 
     private boolean shouldDelete(Exchange exchange) {
-        return getConfiguration().isDeleteAfterRead()
-                && (getConfiguration().isDeleteIfFiltered()
-                    || (!getConfiguration().isDeleteIfFiltered()
-                        && passedThroughFilter(exchange)));
+        return getConfiguration().isDeleteAfterRead() && 
(getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange));
     }
 
     private boolean passedThroughFilter(Exchange exchange) {
@@ -242,9 +252,7 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
     protected void processRollback(Exchange exchange) {
         Exception cause = exchange.getException();
         if (cause != null) {
-            LOG.warn("Exchange failed, so rolling back message status: " + 
exchange, cause);
-        } else {
-            LOG.warn("Exchange failed, so rolling back message status: {}", 
exchange);
+            getExceptionHandler().handleException("Error during processing 
exchange. Will attempt to process the message on next poll.", exchange, cause);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/47c64ec9/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 69b1eb3..cd16707 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer {
 
     private void addDelay(SendMessageRequest request, Exchange exchange) {
         Integer headerValue = 
exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
-        Integer delayValue = Integer.valueOf(0);
+        Integer delayValue;
         if (headerValue == null) {
             LOG.trace("Using the config delay");
             delayValue = getEndpoint().getConfiguration().getDelaySeconds();

Reply via email to