This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a908fe5 CAMEL-17812: Unfetchable mails should not be counted as part of a poll (#7213) a908fe5 is described below commit a908fe55eb30933078d1acc94e3d74eaa482e9d3 Author: Manuel <48989438+mash-...@users.noreply.github.com> AuthorDate: Fri Mar 18 07:01:58 2022 +0100 CAMEL-17812: Unfetchable mails should not be counted as part of a poll (#7213) --- .../apache/camel/component/mail/MailConsumer.java | 117 ++++++++++----------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index 0cfa090..1289f42 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -161,17 +161,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { try { int count = folder.getMessageCount(); if (count > 0) { - List<KeyValueHolder<String, Message>> messages = retrieveMessages(); - - // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case - // we process the message and rollback due an exception - if (getEndpoint().getConfiguration().isPeek()) { - for (KeyValueHolder<String, Message> entry : messages) { - Message message = entry.getValue(); - peekMessage(message); - } - } - polledMessages = processBatch(CastUtils.cast(createExchanges(messages))); + Queue<Exchange> messages = retrieveMessages(); + polledMessages = processBatch(CastUtils.cast(messages)); final MailBoxPostProcessAction postProcessor = getEndpoint().getPostProcessAction(); if (postProcessor != null) { @@ -292,8 +283,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { * endpoint * @throws MessagingException If message retrieval fails */ - private List<KeyValueHolder<String, Message>> retrieveMessages() throws MessagingException { - List<KeyValueHolder<String, Message>> answer = new ArrayList<>(); + private Queue<Exchange> retrieveMessages() throws MessagingException { + Queue<Exchange> answer = new LinkedList<>(); Message[] messages; final SortTerm[] sortTerm = getEndpoint().getSortTerm(); @@ -327,7 +318,15 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { } String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message); if (isValidMessage(key, message)) { - answer.add(new KeyValueHolder<>(key, message)); + // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case + // we process the message and rollback due an exception + if (getEndpoint().getConfiguration().isPeek()) { + peekMessage(message); + } + Exchange exchange = createExchange(new KeyValueHolder<>(key, message)); + if (exchange != null) { + answer.add(exchange); + } } } @@ -388,63 +387,57 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { return null; } - protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, Message>> messages) throws MessagingException { - Queue<Exchange> answer = new LinkedList<>(); - - for (int i = 0; i < messages.size(); i++) { - try { - KeyValueHolder<String, Message> holder = messages.get(i); - String key = holder.getKey(); - Message message = holder.getValue(); + protected Exchange createExchange(KeyValueHolder<String, Message> holder) throws MessagingException { + try { + String key = holder.getKey(); + Message message = holder.getValue(); - if (LOG.isTraceEnabled()) { - LOG.trace("Mail #{} is of type: {} - {}", i, ObjectHelper.classCanonicalName(message), message); - } + if (LOG.isTraceEnabled()) { + LOG.trace("Mail is of type: {} - {}", ObjectHelper.classCanonicalName(message), message); + } - if (!message.getFlags().contains(Flags.Flag.DELETED)) { - Exchange exchange = createExchange(message); - if (getEndpoint().getConfiguration().isMapMailMessage()) { - // ensure the mail message is mapped, which can be ensured by touching the body/header/attachment - LOG.trace("Mapping #{} from javax.mail.Message to Camel MailMessage", i); - exchange.getIn().getBody(); - exchange.getIn().getHeaders(); - // must also map attachments - try { - Map<String, Attachment> att = new HashMap<>(); - getEndpoint().getBinding().extractAttachmentsFromMail(message, att); - if (!att.isEmpty()) { - exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att); - } - } catch (MessagingException | IOException e) { - // must release exchange before throwing exception - releaseExchange(exchange, true); - throw new RuntimeCamelException("Error accessing attachments due to: " + e.getMessage(), e); + if (!message.getFlags().contains(Flags.Flag.DELETED)) { + Exchange exchange = createExchange(message); + if (getEndpoint().getConfiguration().isMapMailMessage()) { + // ensure the mail message is mapped, which can be ensured by touching the body/header/attachment + LOG.trace("Mapping from javax.mail.Message to Camel MailMessage"); + exchange.getIn().getBody(); + exchange.getIn().getHeaders(); + // must also map attachments + try { + Map<String, Attachment> att = new HashMap<>(); + getEndpoint().getBinding().extractAttachmentsFromMail(message, att); + if (!att.isEmpty()) { + exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att); } + } catch (MessagingException | IOException e) { + // must release exchange before throwing exception + releaseExchange(exchange, true); + throw new RuntimeCamelException("Error accessing attachments due to: " + e.getMessage(), e); } + } - // If the protocol is POP3 we need to remember the uid on the exchange - // so we can find the mail message again later to be able to delete it - // we also need to remember the UUID for idempotent repository - exchange.setProperty(MAIL_MESSAGE_UID, key); + // If the protocol is POP3 we need to remember the uid on the exchange + // so we can find the mail message again later to be able to delete it + // we also need to remember the UUID for idempotent repository + exchange.setProperty(MAIL_MESSAGE_UID, key); - answer.add(exchange); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping message as it was flagged as deleted: {}", MailUtils.dumpMessage(message)); - } - } - } catch (Exception e) { - if (skipFailedMessage) { - LOG.debug("Skipping failed message at index {} due {}", i, e.getMessage(), e); - } else if (handleFailedMessage) { - handleException(e); - } else { - throw e; + return exchange; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping message as it was flagged as deleted: {}", MailUtils.dumpMessage(message)); } } + } catch (Exception e) { + if (skipFailedMessage) { + LOG.debug("Skipping failed message due {}", e.getMessage(), e); + } else if (handleFailedMessage) { + handleException(e); + } else { + throw e; + } } - - return answer; + return null; } /**