This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a908fe5  CAMEL-17812: Unfetchable mails should not be counted as part 
of a poll (#7213)
a908fe5 is described below

commit a908fe55eb30933078d1acc94e3d74eaa482e9d3
Author: Manuel <48989438+mash-...@users.noreply.github.com>
AuthorDate: Fri Mar 18 07:01:58 2022 +0100

    CAMEL-17812: Unfetchable mails should not be counted as part of a poll 
(#7213)
---
 .../apache/camel/component/mail/MailConsumer.java  | 117 ++++++++++-----------
 1 file changed, 55 insertions(+), 62 deletions(-)

diff --git 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index 0cfa090..1289f42 100644
--- 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -161,17 +161,8 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
         try {
             int count = folder.getMessageCount();
             if (count > 0) {
-                List<KeyValueHolder<String, Message>> messages = 
retrieveMessages();
-
-                // need to call setPeek on java-mail to avoid the message 
being flagged eagerly as SEEN on the server in case
-                // we process the message and rollback due an exception
-                if (getEndpoint().getConfiguration().isPeek()) {
-                    for (KeyValueHolder<String, Message> entry : messages) {
-                        Message message = entry.getValue();
-                        peekMessage(message);
-                    }
-                }
-                polledMessages = 
processBatch(CastUtils.cast(createExchanges(messages)));
+                Queue<Exchange> messages = retrieveMessages();
+                polledMessages = processBatch(CastUtils.cast(messages));
 
                 final MailBoxPostProcessAction postProcessor = 
getEndpoint().getPostProcessAction();
                 if (postProcessor != null) {
@@ -292,8 +283,8 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
      *                            endpoint
      * @throws MessagingException If message retrieval fails
      */
-    private List<KeyValueHolder<String, Message>> retrieveMessages() throws 
MessagingException {
-        List<KeyValueHolder<String, Message>> answer = new ArrayList<>();
+    private Queue<Exchange> retrieveMessages() throws MessagingException {
+        Queue<Exchange> answer = new LinkedList<>();
 
         Message[] messages;
         final SortTerm[] sortTerm = getEndpoint().getSortTerm();
@@ -327,7 +318,15 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
             }
             String key = 
getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message);
             if (isValidMessage(key, message)) {
-                answer.add(new KeyValueHolder<>(key, message));
+                // need to call setPeek on java-mail to avoid the message 
being flagged eagerly as SEEN on the server in case
+                // we process the message and rollback due an exception
+                if (getEndpoint().getConfiguration().isPeek()) {
+                    peekMessage(message);
+                }
+                Exchange exchange = createExchange(new KeyValueHolder<>(key, 
message));
+                if (exchange != null) {
+                    answer.add(exchange);
+                }
             }
         }
 
@@ -388,63 +387,57 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
         return null;
     }
 
-    protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, 
Message>> messages) throws MessagingException {
-        Queue<Exchange> answer = new LinkedList<>();
-
-        for (int i = 0; i < messages.size(); i++) {
-            try {
-                KeyValueHolder<String, Message> holder = messages.get(i);
-                String key = holder.getKey();
-                Message message = holder.getValue();
+    protected Exchange createExchange(KeyValueHolder<String, Message> holder) 
throws MessagingException {
+        try {
+            String key = holder.getKey();
+            Message message = holder.getValue();
 
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Mail #{} is of type: {} - {}", i, 
ObjectHelper.classCanonicalName(message), message);
-                }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Mail is of type: {} - {}", 
ObjectHelper.classCanonicalName(message), message);
+            }
 
-                if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-                    Exchange exchange = createExchange(message);
-                    if (getEndpoint().getConfiguration().isMapMailMessage()) {
-                        // ensure the mail message is mapped, which can be 
ensured by touching the body/header/attachment
-                        LOG.trace("Mapping #{} from javax.mail.Message to 
Camel MailMessage", i);
-                        exchange.getIn().getBody();
-                        exchange.getIn().getHeaders();
-                        // must also map attachments
-                        try {
-                            Map<String, Attachment> att = new HashMap<>();
-                            
getEndpoint().getBinding().extractAttachmentsFromMail(message, att);
-                            if (!att.isEmpty()) {
-                                
exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att);
-                            }
-                        } catch (MessagingException | IOException e) {
-                            // must release exchange before throwing exception
-                            releaseExchange(exchange, true);
-                            throw new RuntimeCamelException("Error accessing 
attachments due to: " + e.getMessage(), e);
+            if (!message.getFlags().contains(Flags.Flag.DELETED)) {
+                Exchange exchange = createExchange(message);
+                if (getEndpoint().getConfiguration().isMapMailMessage()) {
+                    // ensure the mail message is mapped, which can be ensured 
by touching the body/header/attachment
+                    LOG.trace("Mapping from javax.mail.Message to Camel 
MailMessage");
+                    exchange.getIn().getBody();
+                    exchange.getIn().getHeaders();
+                    // must also map attachments
+                    try {
+                        Map<String, Attachment> att = new HashMap<>();
+                        
getEndpoint().getBinding().extractAttachmentsFromMail(message, att);
+                        if (!att.isEmpty()) {
+                            
exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att);
                         }
+                    } catch (MessagingException | IOException e) {
+                        // must release exchange before throwing exception
+                        releaseExchange(exchange, true);
+                        throw new RuntimeCamelException("Error accessing 
attachments due to: " + e.getMessage(), e);
                     }
+                }
 
-                    // If the protocol is POP3 we need to remember the uid on 
the exchange
-                    // so we can find the mail message again later to be able 
to delete it
-                    // we also need to remember the UUID for idempotent 
repository
-                    exchange.setProperty(MAIL_MESSAGE_UID, key);
+                // If the protocol is POP3 we need to remember the uid on the 
exchange
+                // so we can find the mail message again later to be able to 
delete it
+                // we also need to remember the UUID for idempotent repository
+                exchange.setProperty(MAIL_MESSAGE_UID, key);
 
-                    answer.add(exchange);
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Skipping message as it was flagged as 
deleted: {}", MailUtils.dumpMessage(message));
-                    }
-                }
-            } catch (Exception e) {
-                if (skipFailedMessage) {
-                    LOG.debug("Skipping failed message at index {} due {}", i, 
e.getMessage(), e);
-                } else if (handleFailedMessage) {
-                    handleException(e);
-                } else {
-                    throw e;
+                return exchange;
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Skipping message as it was flagged as deleted: 
{}", MailUtils.dumpMessage(message));
                 }
             }
+        } catch (Exception e) {
+            if (skipFailedMessage) {
+                LOG.debug("Skipping failed message due {}", e.getMessage(), e);
+            } else if (handleFailedMessage) {
+                handleException(e);
+            } else {
+                throw e;
+            }
         }
-
-        return answer;
+        return null;
     }
 
     /**

Reply via email to