Author: davsclaus Date: Tue Jun 28 11:39:06 2011 New Revision: 1140537 URL: http://svn.apache.org/viewvc?rev=1140537&view=rev Log: CAMEL-3655: Fixed polling consumer will suspend/resume scheduled based consumer to avoid having them keep running in the background after usage. For example a ftp consumer will keep polling the ftp server. Introduced PollingConsumerPollingStrategy to control that behavior.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Added: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1140537&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Tue Jun 28 11:39:06 2011 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * Strategy that allows consumers to influence the {@link PollingConsumer}. + */ +public interface PollingConsumerPollingStrategy { + + /** + * Callback invoked when the consumer is started. + * + * @throws Exception can be thrown if error starting. + */ + void onStartup() throws Exception; + + /** + * Callback invoked before the poll. + * + * @throws Exception can be thrown if error occurred + */ + void beforePoll() throws Exception; + + /** + * Callback invoked after the poll. + * + * @throws Exception can be thrown if error occurred + */ + void afterPoll() throws Exception; +} Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Tue Jun 28 11:39:06 2011 @@ -18,11 +18,13 @@ package org.apache.camel.impl; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.util.ServiceHelper; @@ -56,11 +58,19 @@ public class EventDrivenPollingConsumer } public Exchange receive() { + // must be started + if (!isRunAllowed() || !isStarted()) { + throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); + } + while (isRunAllowed()) { try { + beforePoll(); return queue.take(); } catch (InterruptedException e) { handleInterruptedException(e); + } finally { + afterPoll(); } } LOG.trace("Consumer is not running, so returning null"); @@ -68,11 +78,24 @@ public class EventDrivenPollingConsumer } public Exchange receive(long timeout) { + // must be started + if (!isRunAllowed() || !isStarted()) { + throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); + } + + // if the queue is empty and there is no wait then return null + if (timeout == 0 && queue.isEmpty()) { + return null; + } + try { + beforePoll(); return queue.poll(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { handleInterruptedException(e); return null; + } finally { + afterPoll(); } } @@ -92,10 +115,40 @@ public class EventDrivenPollingConsumer getInterruptedExceptionHandler().handleException(e); } + protected void beforePoll() { + if (consumer instanceof PollingConsumerPollingStrategy) { + PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; + try { + strategy.beforePoll(); + } catch (Exception e) { + LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e); + } + } + } + + protected void afterPoll() { + if (consumer instanceof PollingConsumerPollingStrategy) { + PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; + try { + strategy.afterPoll(); + } catch (Exception e) { + LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e); + } + } + } + protected void doStart() throws Exception { // lets add ourselves as a consumer consumer = getEndpoint().createConsumer(this); - ServiceHelper.startService(consumer); + + // if the consumer has a polling strategy then invoke that + if (consumer instanceof PollingConsumerPollingStrategy) { + PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; + strategy.onStartup(); + } else { + // for regular consumers start it + ServiceHelper.startService(consumer); + } } protected void doStop() throws Exception { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java Tue Jun 28 11:39:06 2011 @@ -16,6 +16,8 @@ */ package org.apache.camel.impl; +import java.util.concurrent.RejectedExecutionException; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -46,6 +48,11 @@ public class ProcessorPollingConsumer ex } public Exchange receive() { + // must be started + if (!isRunAllowed() || !isStarted()) { + throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); + } + Exchange exchange = getEndpoint().createExchange(); try { processor.process(exchange); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Jun 28 11:39:06 2011 @@ -22,10 +22,12 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Endpoint; import org.apache.camel.LoggingLevel; +import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; import org.apache.camel.SuspendableService; import org.apache.camel.spi.PollingConsumerPollStrategy; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +36,7 @@ import org.slf4j.LoggerFactory; * * @version */ -public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService { +public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); private final ScheduledExecutorService executor; @@ -270,4 +272,23 @@ public abstract class ScheduledPollConsu protected void doSuspend() throws Exception { // dont stop/cancel the future task since we just check in the run method } + + @Override + public void onStartup() throws Exception { + // start our self + ServiceHelper.startService(this); + } + + @Override + public void beforePoll() throws Exception { + // resume our self + ServiceHelper.resumeService(this); + } + + @Override + public void afterPoll() throws Exception { + // suspend our self + ServiceHelper.suspendService(this); + } + } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java?rev=1140537&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java Tue Jun 28 11:39:06 2011 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.io.File; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumer; + +/** + * To test that using polling consumer with file will not keep scheduled file consumer keep running + * in the background. It should suspend/resume the consumer on demand instead. + */ +public class FilePollingConsumerTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testPollingConsumer() throws Exception { + deleteDirectory("target/enrich"); + template.sendBodyAndHeader("file:target/enrich", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + PollingConsumer consumer = context.getEndpoint("file:target/enrich").createPollingConsumer(); + consumer.start(); + Exchange exchange = consumer.receive(5000); + assertNotNull(exchange); + assertEquals("Hello World", exchange.getIn().getBody(String.class)); + + // sleep a bit to ensure polling consumer would be suspended after we have used it + Thread.sleep(1000); + + // drop a new file which should not be picked up by the consumer + template.sendBodyAndHeader("file:target/enrich", "Bye World", Exchange.FILE_NAME, "bye.txt"); + + // sleep a bit to ensure polling consumer would not have picked up that file + Thread.sleep(1000); + + File file = new File("target/enrich/bye.txt").getAbsoluteFile(); + assertTrue("File should exist " + file, file.exists()); + + // and no exchange on consumer as + exchange = consumer.receiveNoWait(); + assertNull(exchange); + + consumer.stop(); + } + +} Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java?rev=1140537&view=auto ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java (added) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java Tue Jun 28 11:39:06 2011 @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; + +import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumer; +import org.junit.Test; + +/** + * + */ +public class FtpPollingConsumerTest extends FtpServerTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + private String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/polling?password=admin"; + } + + @Test + public void testPollingConsumer() throws Exception { + template.sendBodyAndHeader(getFtpUrl(), "Hello World", Exchange.FILE_NAME, "hello.txt"); + + PollingConsumer consumer = context.getEndpoint(getFtpUrl()).createPollingConsumer(); + consumer.start(); + Exchange exchange = consumer.receive(5000); + assertNotNull(exchange); + assertEquals("Hello World", exchange.getIn().getBody(String.class)); + + // sleep a bit to ensure polling consumer would be suspended after we have used it + Thread.sleep(1000); + + // drop a new file which should not be picked up by the consumer + template.sendBodyAndHeader(getFtpUrl(), "Bye World", Exchange.FILE_NAME, "bye.txt"); + + // sleep a bit to ensure polling consumer would not have picked up that file + Thread.sleep(1000); + + File file = new File(FTP_ROOT_DIR + "polling/bye.txt").getAbsoluteFile(); + assertTrue("File should exist " + file, file.exists()); + + // and no exchange on consumer as + exchange = consumer.receiveNoWait(); + assertNull(exchange); + + consumer.stop(); + } + +}