This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9b3fec8ee3a5c6e4c27732936371d9d86bf886b3 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri May 24 14:02:27 2019 +0200 CAMEL-13570 - Fixed CS --- .../camel/component/aws/sqs/SqsConfiguration.java | 105 ++++++++++++--------- .../camel/component/aws/sqs/SqsConsumer.java | 48 +++++----- 2 files changed, 85 insertions(+), 68 deletions(-) diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index 3a17d7a..4305a01 100644 --- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -96,13 +96,13 @@ public class SqsConfiguration implements Cloneable { private Integer receiveMessageWaitTimeSeconds; @UriParam(label = "queue") private String policy; - + // dead letter queue properties @UriParam(label = "queue") private String redrivePolicy; /** - * Whether or not the queue is a FIFO queue + * Whether or not the queue is a FIFO queue */ boolean isFifoQueue() { // AWS docs suggest this is valid derivation. @@ -184,10 +184,12 @@ public class SqsConfiguration implements Cloneable { } /** - * The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved - * by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest. - * This only make sense if its different from defaultVisibilityTimeout. - * It changes the queue visibility timeout attribute permanently. + * The duration (in seconds) that the received messages are hidden from + * subsequent retrieve requests after being retrieved by a ReceiveMessage + * request to set in the + * com.amazonaws.services.sqs.model.SetQueueAttributesRequest. This only + * make sense if its different from defaultVisibilityTimeout. It changes the + * queue visibility timeout attribute permanently. */ public void setVisibilityTimeout(Integer visibilityTimeout) { this.visibilityTimeout = visibilityTimeout; @@ -198,7 +200,8 @@ public class SqsConfiguration implements Cloneable { } /** - * A list of attribute names to receive when consuming. Multiple names can be separated by comma. + * A list of attribute names to receive when consuming. Multiple names can + * be separated by comma. */ public void setAttributeNames(String attributeNames) { this.attributeNames = attributeNames; @@ -209,7 +212,8 @@ public class SqsConfiguration implements Cloneable { } /** - * A list of message attribute names to receive when consuming. Multiple names can be separated by comma. + * A list of message attribute names to receive when consuming. Multiple + * names can be separated by comma. */ public void setMessageAttributeNames(String messageAttributeNames) { this.messageAttributeNames = messageAttributeNames; @@ -242,7 +246,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Define if you want to apply delaySeconds option to the queue or on single messages + * Define if you want to apply delaySeconds option to the queue or on single + * messages */ public void setDelayQueue(boolean delayQueue) { this.delayQueue = delayQueue; @@ -253,7 +258,8 @@ public class SqsConfiguration implements Cloneable { } /** - * The maximumMessageSize (in bytes) an SQS message can contain for this queue. + * The maximumMessageSize (in bytes) an SQS message can contain for this + * queue. */ public void setMaximumMessageSize(Integer maximumMessageSize) { this.maximumMessageSize = maximumMessageSize; @@ -264,7 +270,8 @@ public class SqsConfiguration implements Cloneable { } /** - * The messageRetentionPeriod (in seconds) a message will be retained by SQS for this queue. + * The messageRetentionPeriod (in seconds) a message will be retained by SQS + * for this queue. */ public void setMessageRetentionPeriod(Integer messageRetentionPeriod) { this.messageRetentionPeriod = messageRetentionPeriod; @@ -286,7 +293,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Specify the policy that send message to DeadLetter queue. See detail at Amazon docs. + * Specify the policy that send message to DeadLetter queue. See detail at + * Amazon docs. */ public void setRedrivePolicy(String redrivePolicy) { this.redrivePolicy = redrivePolicy; @@ -297,8 +305,9 @@ public class SqsConfiguration implements Cloneable { } /** - * If enabled then a scheduled background task will keep extending the message visibility on SQS. - * This is needed if it takes a long time to process the message. If set to true defaultVisibilityTimeout must be set. + * If enabled then a scheduled background task will keep extending the + * message visibility on SQS. This is needed if it takes a long time to + * process the message. If set to true defaultVisibilityTimeout must be set. * See details at Amazon docs. */ public void setExtendMessageVisibility(boolean extendMessageVisibility) { @@ -310,7 +319,8 @@ public class SqsConfiguration implements Cloneable { } /** - * If you do not specify WaitTimeSeconds in the request, the queue attribute ReceiveMessageWaitTimeSeconds is used to determine how long to wait. + * If you do not specify WaitTimeSeconds in the request, the queue attribute + * ReceiveMessageWaitTimeSeconds is used to determine how long to wait. */ public void setReceiveMessageWaitTimeSeconds(Integer receiveMessageWaitTimeSeconds) { this.receiveMessageWaitTimeSeconds = receiveMessageWaitTimeSeconds; @@ -321,7 +331,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Duration in seconds (0 to 20) that the ReceiveMessage action call will wait until a message is in the queue to include in the response. + * Duration in seconds (0 to 20) that the ReceiveMessage action call will + * wait until a message is in the queue to include in the response. */ public void setWaitTimeSeconds(Integer waitTimeSeconds) { this.waitTimeSeconds = waitTimeSeconds; @@ -332,7 +343,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Specify the queue owner aws account id when you need to connect the queue with different account owner. + * Specify the queue owner aws account id when you need to connect the queue + * with different account owner. */ public void setQueueOwnerAWSAccountId(String queueOwnerAWSAccountId) { this.queueOwnerAWSAccountId = queueOwnerAWSAccountId; @@ -343,8 +355,10 @@ public class SqsConfiguration implements Cloneable { } /** - * Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter. - * If 'false' and exchange does not make it through a Camel filter upstream in the route, then don't send DeleteMessage. + * Whether or not to send the DeleteMessage to the SQS queue if an exchange + * fails to get through a filter. If 'false' and exchange does not make it + * through a Camel filter upstream in the route, then don't send + * DeleteMessage. */ public void setDeleteIfFiltered(boolean deleteIfFiltered) { this.deleteIfFiltered = deleteIfFiltered; @@ -355,7 +369,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Specify the queue region which could be used with queueOwnerAWSAccountId to build the service URL. + * Specify the queue region which could be used with queueOwnerAWSAccountId + * to build the service URL. */ public void setRegion(String region) { this.region = region; @@ -366,7 +381,8 @@ public class SqsConfiguration implements Cloneable { } /** - * Allows you to use multiple threads to poll the sqs queue to increase throughput + * Allows you to use multiple threads to poll the sqs queue to increase + * throughput */ public void setConcurrentConsumers(int concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; @@ -377,8 +393,9 @@ public class SqsConfiguration implements Cloneable { } /** - * To define the queueUrl explicitly. All other parameters, which would influence the queueUrl, are ignored. - * This parameter is intended to be used, to connect to a mock implementation of SQS, for testing purposes. + * To define the queueUrl explicitly. All other parameters, which would + * influence the queueUrl, are ignored. This parameter is intended to be + * used, to connect to a mock implementation of SQS, for testing purposes. */ public void setQueueUrl(String queueUrl) { this.queueUrl = queueUrl; @@ -411,7 +428,8 @@ public class SqsConfiguration implements Cloneable { } /** - * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a custom CMK. + * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a + * custom CMK. */ public void setKmsMasterKeyId(String kmsMasterKeyId) { this.kmsMasterKeyId = kmsMasterKeyId; @@ -422,9 +440,10 @@ public class SqsConfiguration implements Cloneable { } /** - * The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt - * messages before calling AWS KMS again. An integer representing seconds, between 60 seconds (1 minute) - * and 86,400 seconds (24 hours). Default: 300 (5 minutes). + * The length of time, in seconds, for which Amazon SQS can reuse a data key + * to encrypt or decrypt messages before calling AWS KMS again. An integer + * representing seconds, between 60 seconds (1 minute) and 86,400 seconds + * (24 hours). Default: 300 (5 minutes). */ public void setKmsDataKeyReusePeriodSeconds(Integer kmsDataKeyReusePeriodSeconds) { this.kmsDataKeyReusePeriodSeconds = kmsDataKeyReusePeriodSeconds; @@ -442,9 +461,10 @@ public class SqsConfiguration implements Cloneable { } /** - * Only for FIFO queues. Strategy for setting the messageGroupId on the message. - * Can be one of the following options: *useConstant*, *useExchangeId*, *usePropertyValue*. - * For the *usePropertyValue* option, the value of property "CamelAwsMessageGroupId" will be used. + * Only for FIFO queues. Strategy for setting the messageGroupId on the + * message. Can be one of the following options: *useConstant*, + * *useExchangeId*, *usePropertyValue*. For the *usePropertyValue* option, + * the value of property "CamelAwsMessageGroupId" will be used. */ public void setMessageGroupIdStrategy(String strategy) { if ("useConstant".equalsIgnoreCase(strategy)) { @@ -467,9 +487,10 @@ public class SqsConfiguration implements Cloneable { } /** - * Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message. - * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*. - * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message. + * Only for FIFO queues. Strategy for setting the messageDeduplicationId on + * the message. Can be one of the following options: *useExchangeId*, + * *useContentBasedDeduplication*. For the *useContentBasedDeduplication* + * option, no messageDeduplicationId will be set on the message. */ public void setMessageDeduplicationIdStrategy(String strategy) { if ("useExchangeId".equalsIgnoreCase(strategy)) { @@ -480,7 +501,7 @@ public class SqsConfiguration implements Cloneable { throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy); } } - + public SqsOperations getOperation() { return operation; } @@ -491,23 +512,23 @@ public class SqsConfiguration implements Cloneable { public void setOperation(SqsOperations operation) { this.operation = operation; } - + public boolean isAutoCreateQueue() { - return autoCreateQueue; - } + return autoCreateQueue; + } /** * Setting the autocreation of the queue */ - public void setAutoCreateQueue(boolean autoCreateQueue) { - this.autoCreateQueue = autoCreateQueue; - } - + public void setAutoCreateQueue(boolean autoCreateQueue) { + this.autoCreateQueue = autoCreateQueue; + } + // ************************************************* // // ************************************************* - public SqsConfiguration copy() { + public SqsConfiguration copy() { try { return (SqsConfiguration)super.clone(); } catch (CloneNotSupportedException e) { diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index b6892be..be11e6b 100644 --- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -52,7 +52,7 @@ import org.apache.camel.util.URISupport; * <a href="http://aws.amazon.com/sqs/">AWS SQS</a> */ public class SqsConsumer extends ScheduledBatchPollingConsumer { - + private ScheduledExecutorService scheduledExecutor; private transient String sqsConsumerToString; private Collection<String> attributeNames; @@ -76,7 +76,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { // must reset for each poll shutdownRunningTask = null; pendingExchanges = 0; - + ReceiveMessageRequest request = new ReceiveMessageRequest(getQueueUrl()); request.setMaxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : null); request.setVisibilityTimeout(getConfiguration().getVisibilityTimeout() != null ? getConfiguration().getVisibilityTimeout() : null); @@ -90,7 +90,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } log.trace("Receiving messages with request [{}]...", request); - + ReceiveMessageResult messageResult = null; try { messageResult = getClient().receiveMessage(request); @@ -103,16 +103,16 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { if (log.isTraceEnabled()) { log.trace("Received {} messages", messageResult.getMessages().size()); } - + Queue<Exchange> exchanges = createExchanges(messageResult.getMessages()); return processBatch(CastUtils.cast(exchanges)); } public void reConnectToQueue() { try { - if (getEndpoint().getConfiguration().isAutoCreateQueue()) { + if (getEndpoint().getConfiguration().isAutoCreateQueue()) { getEndpoint().createQueue(getClient()); - } + } } catch (QueueDeletedRecentlyException qdr) { log.debug("Queue recently deleted, will retry in 30 seconds."); try { @@ -125,12 +125,12 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { log.warn("Could not connect to queue in amazon.", e); } } - + protected Queue<Exchange> createExchanges(List<Message> messages) { if (log.isTraceEnabled()) { log.trace("Received {} messages in this poll", messages.size()); } - + Queue<Exchange> answer = new LinkedList<>(); for (Message message : messages) { Exchange exchange = getEndpoint().createExchange(message); @@ -139,7 +139,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { return answer; } - + public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -162,10 +162,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { int repeatSeconds = Double.valueOf(visibilityTimeout.doubleValue() * 1.5).intValue(); if (log.isDebugEnabled()) { log.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", - new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()}); + new Object[] {delay, period, repeatSeconds, exchange.getExchangeId()}); } - final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate( - new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); + final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, repeatSeconds), delay, period, + TimeUnit.SECONDS); exchange.addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { @@ -212,7 +212,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { return total; } - + /** * Strategy to delete the message after being processed. * @@ -237,12 +237,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } private boolean shouldDelete(Exchange exchange) { - boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null - && getConfiguration().isDeleteIfFiltered() - && passedThroughFilter(exchange); + boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange); - return getConfiguration().isDeleteAfterRead() - || shouldDeleteByFilter; + return getConfiguration().isDeleteAfterRead() || shouldDeleteByFilter; } private boolean passedThroughFilter(Exchange exchange) { @@ -264,18 +261,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { protected SqsConfiguration getConfiguration() { return getEndpoint().getConfiguration(); } - + protected AmazonSQS getClient() { return getEndpoint().getClient(); } - + protected String getQueueUrl() { return getEndpoint().getQueueUrl(); } - + @Override public SqsEndpoint getEndpoint() { - return (SqsEndpoint) super.getEndpoint(); + return (SqsEndpoint)super.getEndpoint(); } @Override @@ -318,8 +315,8 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { @Override public void run() { - ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(), - exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class), repeatSeconds); + ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(), exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class), + repeatSeconds); try { log.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange); @@ -330,8 +327,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } catch (MessageNotInflightException e) { // Ignore. } catch (Exception e) { - log.warn("Extending visibility window failed for exchange " + exchange - + ". Will not attempt to extend visibility further. This exception will be ignored.", e); + log.warn("Extending visibility window failed for exchange " + exchange + ". Will not attempt to extend visibility further. This exception will be ignored.", e); } } }