CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/56ccbef1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/56ccbef1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/56ccbef1 Branch: refs/heads/master Commit: 56ccbef11d794c413807ffc3f029a089f0c6542f Parents: 4c49932 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Feb 20 12:20:07 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Feb 20 12:22:02 2016 +0100 ---------------------------------------------------------------------- .../component/mail/DefaultMailUidGenerator.java | 6 ++--- .../camel/component/mail/MailConsumer.java | 28 +++++++++----------- 2 files changed, 15 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java ---------------------------------------------------------------------- diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java index 58772ea..829d408 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index 8ea2260..fc1417e 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -17,11 +17,8 @@ package org.apache.camel.component.mail; import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; import javax.mail.Flags; import javax.mail.Folder; @@ -40,6 +37,7 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IntrospectionSupport; +import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,12 +115,13 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { try { int count = folder.getMessageCount(); if (count > 0) { - Map<String, Message> messages = retrieveMessages(); + List<KeyValueHolder<String, Message>> messages = retrieveMessages(); // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case // we process the message and rollback due an exception if (getEndpoint().getConfiguration().isPeek()) { - for (Message message : messages.values()) { + for (KeyValueHolder<String, Message> entry : messages) { + Message message = entry.getValue(); peekMessage(message); } } @@ -242,8 +241,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { * @return Messages from input folder according to the search and sort criteria stored in the endpoint * @throws MessagingException If message retrieval fails */ - private Map<String, Message> retrieveMessages() throws MessagingException { - Map<String, Message> answer = new LinkedHashMap<String, Message>(); + private List<KeyValueHolder<String, Message>> retrieveMessages() throws MessagingException { + List<KeyValueHolder<String, Message>> answer = new ArrayList<>(); Message[] messages; final SortTerm[] sortTerm = getEndpoint().getSortTerm(); @@ -272,7 +271,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { for (Message message : messages) { String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message); if (isValidMessage(key, message)) { - answer.put(key, message); + answer.add(new KeyValueHolder<>(key, message)); } } @@ -327,7 +326,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { return null; } - protected Queue<Exchange> createExchanges(Map<String, Message> messages) throws MessagingException { + protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, Message>> messages) throws MessagingException { Queue<Exchange> answer = new LinkedList<Exchange>(); int fetchSize = getEndpoint().getConfiguration().getFetchSize(); @@ -337,13 +336,10 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size()); } - Iterator<String> it = messages.keySet().iterator(); - int i = 0; - while (i < count) { - i++; - - String key = it.next(); - Message message = messages.get(key); + for (int i = 0; i < count; i++) { + KeyValueHolder<String, Message> holder = messages.get(i); + String key = holder.getKey(); + Message message = holder.getValue(); if (LOG.isTraceEnabled()) { LOG.trace("Mail #{} is of type: {} - {}", new Object[]{i, ObjectHelper.classCanonicalName(message), message});