Author: davsclaus Date: Tue Aug 24 08:48:15 2010 New Revision: 988435 URL: http://svn.apache.org/viewvc?rev=988435&view=rev Log: CAMEL-3072: Added new option throttleEntries. Thanks to Lorrin Nelson for the patch.
Added: camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java (with props) Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java?rev=988435&r1=988434&r2=988435&view=diff ============================================================================== --- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java (original) +++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java Tue Aug 24 08:48:15 2010 @@ -62,8 +62,8 @@ public class AtomEndpoint extends FeedEn } @Override - protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) { - return new AtomEntryPollingConsumer(this, processor, filter, lastUpdate); + protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate, boolean throttleEntries) { + return new AtomEntryPollingConsumer(this, processor, filter, lastUpdate, throttleEntries); } @Override Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java?rev=988435&r1=988434&r2=988435&view=diff ============================================================================== --- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java (original) +++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java Tue Aug 24 08:48:15 2010 @@ -34,8 +34,8 @@ import org.apache.camel.component.feed.F public class AtomEntryPollingConsumer extends FeedEntryPollingConsumer { private Document<Feed> document; - public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) { - super(endpoint, processor, filter, lastUpdate); + public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate, boolean throttleEntries) { + super(endpoint, processor, filter, lastUpdate, throttleEntries); } private Document<Feed> getDocument() throws IOException, ParseException { Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java?rev=988435&r1=988434&r2=988435&view=diff ============================================================================== --- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java (original) +++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java Tue Aug 24 08:48:15 2010 @@ -36,6 +36,7 @@ public abstract class FeedEndpoint exten protected boolean filter = true; private boolean feedHeader = true; private boolean sortEntries; + private boolean throttleEntries = true; public FeedEndpoint() { } @@ -67,7 +68,7 @@ public abstract class FeedEndpoint exten FeedPollingConsumer answer; if (isSplitEntries()) { - answer = createEntryPollingConsumer(this, processor, filter, lastUpdate); + answer = createEntryPollingConsumer(this, processor, filter, lastUpdate, throttleEntries); } else { answer = createPollingConsumer(this, processor); } @@ -81,7 +82,7 @@ public abstract class FeedEndpoint exten protected abstract FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor); - protected abstract FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate); + protected abstract FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate, boolean throttleEntries); protected Exchange createExchangeWithFeedHeader(Object feed, String header) { Exchange exchange = createExchange(); @@ -192,6 +193,18 @@ public abstract class FeedEndpoint exten return true; } + /** + * Sets whether all entries identified in a single feed poll should be delivered immediately. If true, only one + * entry is processed per consumer.delay. Only applicable when splitEntries = true. + */ + public void setThrottleEntries(boolean throttleEntries) { + this.throttleEntries = throttleEntries; + } + + public boolean isThrottleEntries() { + return this.throttleEntries; + } + // Implementation methods //------------------------------------------------------------------------- Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java?rev=988435&r1=988434&r2=988435&view=diff ============================================================================== --- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (original) +++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Tue Aug 24 08:48:15 2010 @@ -30,12 +30,14 @@ public abstract class FeedEntryPollingCo protected int entryIndex; protected EntryFilter entryFilter; protected List list; + protected boolean throttleEntries; - public FeedEntryPollingConsumer(FeedEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) { + public FeedEntryPollingConsumer(FeedEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate, boolean throttleEntries) { super(endpoint, processor); if (filter) { entryFilter = createEntryFilter(lastUpdate); } + this.throttleEntries = throttleEntries; } public void poll() throws Exception { @@ -52,8 +54,10 @@ public abstract class FeedEntryPollingCo if (valid) { Exchange exchange = endpoint.createExchange(feed, entry); getProcessor().process(exchange); - // return and wait for the next poll to continue from last time (this consumer is stateful) - return; + if (this.throttleEntries) { + // return and wait for the next poll to continue from last time (this consumer is stateful) + return; + } } } Added: camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java?rev=988435&view=auto ============================================================================== --- camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java (added) +++ camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java Tue Aug 24 08:48:15 2010 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atom; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * @version $Revision$ + */ +public class AtomPollingUnthrottledTest extends CamelTestSupport { + + @Test + public void testLowDelay() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(7); + mock.setResultWaitTime(3000L); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("atom:file:src/test/data/feed.atom?splitEntries=true&throttleEntries=false&consumer.initialDelay=0").to("mock:result"); + } + }; + } + +} Propchange: camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date