Updated Branches: refs/heads/master f3285bee4 -> ebeca860c
CAMEL-4876: Add support for a back-off multiplier capability to the ScheduledPollConsumer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ebeca860 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ebeca860 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ebeca860 Branch: refs/heads/master Commit: ebeca860cf3d05e979645d730ecc7e306c34870c Parents: f3285be Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 16 13:38:56 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 16 13:39:13 2013 +0200 ---------------------------------------------------------------------- .../mbean/ManagedSchedulePollConsumerMBean.java | 13 ++ .../camel/impl/LoggingExceptionHandler.java | 2 +- .../camel/impl/ScheduledPollConsumer.java | 98 +++++++++++++-- .../camel/impl/ScheduledPollEndpoint.java | 15 ++- .../mbean/ManagedScheduledPollConsumer.java | 15 +++ .../file/FileConsumeBackoffMultiplierTest.java | 53 ++++++++ .../impl/ScheduledPollConsumerBackoffTest.java | 126 +++++++++++++++++++ .../ManagedScheduledPollConsumerTest.java | 16 ++- 8 files changed, 326 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java index 3179ee6..27949ed 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.spi.UriParam; public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean { @@ -56,4 +57,16 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean { @ManagedAttribute(description = "Scheduler classname") String getSchedulerClassName(); + @ManagedAttribute(description = "Backoff multiplier") + int getBackoffMultiplier(); + + @ManagedAttribute(description = "Backoff idle threshold") + int getBackoffIdleThreshold(); + + @ManagedAttribute(description = "Backoff error threshold") + int getBackoffErrorThreshold(); + + @ManagedAttribute(description = "Current backoff counter") + int getBackoffCounter(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java index 5f3406a..6518ad4 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java @@ -25,7 +25,7 @@ import org.apache.camel.util.CamelLogger; import org.slf4j.LoggerFactory; /** - * A default implementation of {@link ExceptionHandler} which uses a {@link org.apache.camel.processor.CamelLogger} to + * A default implementation of {@link ExceptionHandler} which uses a {@link org.apache.camel.util.CamelLogger} to * log the exception. * <p/> * This implementation will by default log the exception with stack trace at WARN level. http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index fd07878..6b34ac7 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory; /** * A useful base class for any consumer which is polling based - * - * @version */ public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { private static final Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); @@ -67,9 +65,20 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R private boolean sendEmptyMessageWhenIdle; @UriParam private boolean greedy; - private volatile boolean polling; + @UriParam + private int backoffMultiplier; + @UriParam + private int backoffIdleThreshold; + @UriParam + private int backoffErrorThreshold; private Map<String, Object> schedulerProperties; + // state during running + private volatile boolean polling; + private volatile int backoffCounter; + private volatile long idleCounter; + private volatile long errorCounter; + public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -129,9 +138,32 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R return; } + // should we backoff if its enabled, and either the idle or error counter is > the threshold + if (backoffMultiplier > 0 + // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use + && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE)) + || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { + if (backoffCounter++ < backoffMultiplier) { + // yes we should backoff + if (idleCounter > 0) { + LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", new Object[]{idleCounter, backoffCounter, backoffMultiplier}); + } else { + LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", new Object[]{errorCounter, backoffCounter, backoffMultiplier}); + } + return; + } else { + // we are finished with backoff so reset counters + idleCounter = 0; + errorCounter = 0; + backoffCounter = 0; + LOG.trace("doRun() backoff finished, resetting counters."); + } + } + int retryCounter = -1; boolean done = false; Throwable cause = null; + int polledMessages = 0; while (!done) { try { @@ -152,7 +184,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R boolean begin = pollStrategy.begin(this, getEndpoint()); if (begin) { retryCounter++; - int polledMessages = poll(); + polledMessages = poll(); if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { // send an "empty" exchange @@ -198,15 +230,23 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R // let exception handler deal with the caused exception // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown try { - getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() + getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() + ". Will try again at next poll", cause); } catch (Throwable e) { LOG.warn("Error handling exception. This exception will be ignored.", e); } - cause = null; } } + if (cause != null) { + idleCounter = 0; + errorCounter++; + } else { + idleCounter = polledMessages == 0 ? ++idleCounter : 0; + errorCounter = 0; + } + LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter); + // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread } @@ -330,11 +370,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R public void setStartScheduler(boolean startScheduler) { this.startScheduler = startScheduler; } - + public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; } - + public boolean isSendEmptyMessageWhenIdle() { return sendEmptyMessageWhenIdle; } @@ -350,6 +390,34 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R this.greedy = greedy; } + public int getBackoffCounter() { + return backoffCounter; + } + + public int getBackoffMultiplier() { + return backoffMultiplier; + } + + public void setBackoffMultiplier(int backoffMultiplier) { + this.backoffMultiplier = backoffMultiplier; + } + + public int getBackoffIdleThreshold() { + return backoffIdleThreshold; + } + + public void setBackoffIdleThreshold(int backoffIdleThreshold) { + this.backoffIdleThreshold = backoffIdleThreshold; + } + + public int getBackoffErrorThreshold() { + return backoffErrorThreshold; + } + + public void setBackoffErrorThreshold(int backoffErrorThreshold) { + this.backoffErrorThreshold = backoffErrorThreshold; + } + public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } @@ -393,6 +461,14 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R protected void doStart() throws Exception { super.doStart(); + // validate that if backoff multiplier is in use, the threshold values is set correclty + if (backoffMultiplier > 0) { + if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) { + throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier"); + } + LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", new Object[]{backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()}); + } + if (scheduler == null) { scheduler = new DefaultScheduledPollConsumerScheduler(); } @@ -438,6 +514,12 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R @Override protected void doStop() throws Exception { ServiceHelper.stopService(scheduler); + + // clear counters + backoffCounter = 0; + idleCounter = 0; + errorCounter = 0; + super.doStop(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 9deb698..373508d 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Component; import org.apache.camel.PollingConsumer; +import org.apache.camel.spi.UriParam; import org.apache.camel.util.IntrospectionSupport; /** @@ -71,6 +72,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { Object greedy = options.remove("greedy"); Object scheduledExecutorService = options.remove("scheduledExecutorService"); Object scheduler = options.remove("scheduler"); + Object backoffMultiplier = options.remove("backoffMultiplier"); + Object backoffIdleThreshold = options.remove("backoffIdleThreshold"); + Object backoffErrorThreshold = options.remove("backoffErrorThreshold"); boolean setConsumerProperties = false; // the following is split into two if statements to satisfy the checkstyle max complexity constraint @@ -80,7 +84,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) { setConsumerProperties = true; } - if (scheduler != null || !schedulerProperties.isEmpty()) { + if (scheduler != null || !schedulerProperties.isEmpty() || backoffMultiplier != null || backoffIdleThreshold != null || backoffErrorThreshold != null) { setConsumerProperties = true; } @@ -141,6 +145,15 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (!schedulerProperties.isEmpty()) { consumerProperties.put("schedulerProperties", schedulerProperties); } + if (backoffMultiplier != null) { + consumerProperties.put("backoffMultiplier", backoffMultiplier); + } + if (backoffIdleThreshold != null) { + consumerProperties.put("backoffIdleThreshold", backoffIdleThreshold); + } + if (backoffErrorThreshold != null) { + consumerProperties.put("backoffErrorThreshold", backoffErrorThreshold); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java index ad341a3..64694fd 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java @@ -83,4 +83,19 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man return getConsumer().getScheduler().getClass().getName(); } + public int getBackoffMultiplier() { + return getConsumer().getBackoffMultiplier(); + } + + public int getBackoffIdleThreshold() { + return getConsumer().getBackoffIdleThreshold(); + } + + public int getBackoffErrorThreshold() { + return getConsumer().getBackoffErrorThreshold(); + } + + public int getBackoffCounter() { + return getConsumer().getBackoffCounter(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeBackoffMultiplierTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeBackoffMultiplierTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeBackoffMultiplierTest.java new file mode 100644 index 0000000..45ace58 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeBackoffMultiplierTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * + */ +public class FileConsumeBackoffMultiplierTest extends ContextTestSupport { + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/files"); + super.setUp(); + template.sendBodyAndHeader("file://target/files", "Hello World", Exchange.FILE_NAME, "report.txt"); + } + + public void testBackoffMultiplier() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + assertMockEndpointsSatisfied(); + + oneExchangeDone.matchesMockWaitTime(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("file://target/files/?delete=true&backoffMultiplier=4&backoffIdleThreshold=3").convertBodyTo(String.class).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java new file mode 100644 index 0000000..95f0b8f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.spi.PollingConsumerPollStrategy; + +public class ScheduledPollConsumerBackoffTest extends ContextTestSupport { + + private static int commits = 0; + private static int errors = 0; + + public void testBackoffIdle() throws Exception { + + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, null); + consumer.setBackoffMultiplier(4); + consumer.setBackoffIdleThreshold(2); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + commits++; + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + return false; + } + }); + + consumer.start(); + + consumer.run(); + consumer.run(); + assertEquals(2, commits); + // now it should backoff 4 times + consumer.run(); + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(2, commits); + // and now we poll again + consumer.run(); + consumer.run(); + assertEquals(4, commits); + // now it should backoff 4 times + consumer.run(); + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(4, commits); + consumer.run(); + assertEquals(5, commits); + + consumer.stop(); + } + + public void testBackoffError() throws Exception { + + final Endpoint endpoint = getMockEndpoint("mock:foo"); + final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); + consumer.setBackoffMultiplier(4); + consumer.setBackoffErrorThreshold(3); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + commits++; + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + errors++; + return false; + } + }); + + consumer.start(); + + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(3, errors); + // now it should backoff 4 times + consumer.run(); + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(3, errors); + // and now we poll again + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(6, errors); + // now it should backoff 4 times + consumer.run(); + consumer.run(); + consumer.run(); + consumer.run(); + assertEquals(6, errors); + + consumer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ebeca860/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java index aaec9a5..92fdd17 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedScheduledPollConsumerTest.java @@ -39,7 +39,7 @@ public class ManagedScheduledPollConsumerTest extends ManagementTestSupport { assertTrue("Should be registered", mbeanServer.isRegistered(on)); String uri = (String) mbeanServer.getAttribute(on, "EndpointUri"); - assertEquals("file://target/foo?delay=4000", uri); + assertEquals("file://target/foo?backoffErrorThreshold=3&backoffIdleThreshold=2&backoffMultiplier=4&delay=4000", uri); Long delay = (Long) mbeanServer.getAttribute(on, "Delay"); assertEquals(4000, delay.longValue()); @@ -56,6 +56,18 @@ public class ManagedScheduledPollConsumerTest extends ManagementTestSupport { String timeUnit = (String) mbeanServer.getAttribute(on, "TimeUnit"); assertEquals(TimeUnit.MILLISECONDS.toString(), timeUnit); + Integer backoffMultiplier = (Integer) mbeanServer.getAttribute(on, "BackoffMultiplier"); + assertEquals(4, backoffMultiplier.longValue()); + + Integer backoffCounter = (Integer) mbeanServer.getAttribute(on, "BackoffCounter"); + assertEquals(0, backoffCounter.longValue()); + + Integer backoffIdleThreshold = (Integer) mbeanServer.getAttribute(on, "BackoffIdleThreshold"); + assertEquals(2, backoffIdleThreshold.longValue()); + + Integer backoffErrorThreshold = (Integer) mbeanServer.getAttribute(on, "BackoffErrorThreshold"); + assertEquals(3, backoffErrorThreshold.longValue()); + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); assertEquals("route1", routeId); @@ -96,7 +108,7 @@ public class ManagedScheduledPollConsumerTest extends ManagementTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("file://target/foo?delay=4000").to("mock:result"); + from("file://target/foo?delay=4000&backoffMultiplier=4&backoffIdleThreshold=2&backoffErrorThreshold=3").to("mock:result"); } }; }