This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 06ec6ab Add fix for header override by Azure Storage Blob download consumer (#4949) 06ec6ab is described below commit 06ec6abfd9dfa16e875af8cd0cb6c85bdc2ea318 Author: Mark Andreev <mark.andr...@gmail.com> AuthorDate: Fri Jan 29 18:52:40 2021 +0300 Add fix for header override by Azure Storage Blob download consumer (#4949) --- .../component/azure/storage/queue/QueueConsumer.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java index fad58ec..f391e0a 100644 --- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java +++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.azure.storage.queue; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -111,10 +112,24 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer { // update pending number of exchanges pendingExchanges = total - index - 1; + // copy messageId, popReceipt, timeout for fix exchange override case + // azure storage blob can override this headers + final String messageId = exchange.getIn() + .getHeader(QueueConstants.MESSAGE_ID, String.class); + final String popReceipt = exchange.getIn() + .getHeader(QueueConstants.POP_RECEIPT, String.class); + final Duration timeout = exchange.getIn() + .getHeader(QueueConstants.TIMEOUT, Duration.class); + // add on completion to handle after work when the exchange is done exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { + // past messageId, popReceipt, timeout for fix exchange override case + exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, messageId); + exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, popReceipt); + exchange.getIn().setHeader(QueueConstants.TIMEOUT, timeout); + processCommit(exchange); }