Repository: camel Updated Branches: refs/heads/camel-2.15.x 18859fb05 -> bd10c49bd refs/heads/master 06db3cd24 -> 47c64ec9c
CAMEL-9055: camel-aws - SQS should not allow handover the delete task Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/47c64ec9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/47c64ec9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/47c64ec9 Branch: refs/heads/master Commit: 47c64ec9c6b9609a71113ad82b15d2c66463c4cd Parents: 06db3cd Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 5 15:33:43 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 5 15:36:39 2015 +0200 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsConsumer.java | 40 ++++++++++++-------- .../camel/component/aws/sqs/SqsProducer.java | 2 +- 2 files changed, 25 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/47c64ec9/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 0a8d024..d3e0a25 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -34,13 +34,13 @@ import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; - import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -141,17 +141,20 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { // schedule task to extend visibility if enabled Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); - if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) { - int delay = visibilityTimeout.intValue() / 2; - int period = visibilityTimeout.intValue(); + if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout / 2) > 0) { + int delay = visibilityTimeout / 2; + int period = visibilityTimeout; int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()}); } + final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate( new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); - exchange.addOnCompletion(new Synchronization() { + + // as the AWS client is not thread-safe we cannot handover the task + exchange.addOnCompletion(new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { cancelExtender(exchange); @@ -162,6 +165,11 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { cancelExtender(exchange); } + @Override + public boolean allowHandover() { + return false; + } + private void cancelExtender(Exchange exchange) { // cancel task as we are done LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange.getExchangeId()); @@ -170,24 +178,30 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { }); } - // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { + // as the AWS client is not thread-safe we cannot handover the task + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override public void onComplete(Exchange exchange) { processCommit(exchange); } + @Override public void onFailure(Exchange exchange) { processRollback(exchange); } @Override + public boolean allowHandover() { + return false; + } + + @Override public String toString() { return "SqsConsumerOnCompletion"; } }); - LOG.trace("Processing exchange [{}]...", exchange); getAsyncProcessor().process(exchange, new AsyncCallback() { @Override @@ -207,7 +221,6 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { */ protected void processCommit(Exchange exchange) { try { - if (shouldDelete(exchange)) { String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class); DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle); @@ -224,10 +237,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } private boolean shouldDelete(Exchange exchange) { - return getConfiguration().isDeleteAfterRead() - && (getConfiguration().isDeleteIfFiltered() - || (!getConfiguration().isDeleteIfFiltered() - && passedThroughFilter(exchange))); + return getConfiguration().isDeleteAfterRead() && (getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange)); } private boolean passedThroughFilter(Exchange exchange) { @@ -242,9 +252,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { protected void processRollback(Exchange exchange) { Exception cause = exchange.getException(); if (cause != null) { - LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause); - } else { - LOG.warn("Exchange failed, so rolling back message status: {}", exchange); + getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause); } } http://git-wip-us.apache.org/repos/asf/camel/blob/47c64ec9/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index 69b1eb3..cd16707 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer { private void addDelay(SendMessageRequest request, Exchange exchange) { Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class); - Integer delayValue = Integer.valueOf(0); + Integer delayValue; if (headerValue == null) { LOG.trace("Using the config delay"); delayValue = getEndpoint().getConfiguration().getDelaySeconds();