This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new bb40ceb CAMEL-14425: Polling Optimization: Consider number of maximum messages to process earlier during the poll to prevent running through multiple for-loops with all messages (#3508) bb40ceb is described below commit bb40ceb8e2a9b36acfb9d9372b1f276b9d37ffe2 Author: Manuel <48989438+mash-...@users.noreply.github.com> AuthorDate: Fri Jan 24 17:07:55 2020 +0100 CAMEL-14425: Polling Optimization: Consider number of maximum messages to process earlier during the poll to prevent running through multiple for-loops with all messages (#3508) --- .../camel-mail/src/main/docs/mail-component.adoc | 9 +++++ .../apache/camel/component/mail/MailConsumer.java | 46 +++++++++++++++------- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/components/camel-mail/src/main/docs/mail-component.adoc b/components/camel-mail/src/main/docs/mail-component.adoc index 095f772..089971e 100644 --- a/components/camel-mail/src/main/docs/mail-component.adoc +++ b/components/camel-mail/src/main/docs/mail-component.adoc @@ -742,3 +742,12 @@ builder.unseen().body(Op.not, "Spam").subject(Op.not, "Spam") SearchTerm term = builder.build(); -------------------------------------------------------------- + +== Polling Optimization + +*Since Camel 3.1* + +The parameter maxMessagePerPoll and fetchSize allow you to restrict the number message that should be processed for each poll. +These parameters should help to prevent bad performance when working with folders that contain a lot of messages. +In previous versions these parameters have been evaluated too late, so that big mailboxes could still cause performance problems. +With Camel 3.1 these parameters are evaluated earlier during the poll to avoid these problems. \ No newline at end of file diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index 0a87735..aad7939 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -95,6 +95,27 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { super.doStop(); } + /** + * Returns the max number of messages to be processed. Will return -1 if no maximum is set + */ + private int getMaxNumberOfMessages() { + int fetchSize = getEndpoint().getConfiguration().getFetchSize(); + if (hasMessageLimit(fetchSize)) { + return fetchSize; + } + + int maximumMessagesPerPoll = (getMaxMessagesPerPoll() == 0) ? -1 : getMaxMessagesPerPoll(); + if (hasMessageLimit(maximumMessagesPerPoll)) { + return maximumMessagesPerPoll; + } + + return -1; + } + + private boolean hasMessageLimit(int limitValue) { + return limitValue >= 0; + } + @Override protected int poll() throws Exception { // must reset for each poll @@ -185,13 +206,6 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { @Override public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); - - // limit if needed - if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { - LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total); - total = maxMessagesPerPoll; - } - for (int index = 0; index < total && isBatchAllowed(); index++) { // only loop if we are started (allowed to run) Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); @@ -281,13 +295,22 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { } } + int maxMessage = getMaxNumberOfMessages(); + boolean hasMessageLimit = hasMessageLimit(maxMessage); for (Message message : messages) { + if (hasMessageLimit && answer.size() >= maxMessage) { + break; + } String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message); if (isValidMessage(key, message)) { answer.add(new KeyValueHolder<>(key, message)); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Fetching {} messages. Total {} messages.", answer.size(), messages.length); + } + return answer; } @@ -342,14 +365,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, Message>> messages) throws MessagingException { Queue<Exchange> answer = new LinkedList<>(); - int fetchSize = getEndpoint().getConfiguration().getFetchSize(); - int count = fetchSize == -1 ? messages.size() : Math.min(fetchSize, messages.size()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size()); - } - - for (int i = 0; i < count; i++) { + for (int i = 0; i < messages.size(); i++) { try { KeyValueHolder<String, Message> holder = messages.get(i); String key = holder.getKey();