CAMEL-8368: Adding new scheduler component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cdadacb8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cdadacb8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cdadacb8 Branch: refs/heads/master Commit: cdadacb80b61d9f44611e807dc063151bf80d328 Parents: 0478551 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Feb 21 16:00:21 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Feb 21 16:00:21 2015 +0100 ---------------------------------------------------------------------- .../component/scheduler/SchedulerConsumer.java | 52 ++++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cdadacb8/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java index 49b3b2d..a18b031 100644 --- a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java @@ -16,28 +16,61 @@ */ package org.apache.camel.component.scheduler; -import org.apache.camel.Endpoint; +import java.util.Date; + +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public class SchedulerConsumer extends ScheduledPollConsumer { - public SchedulerConsumer(Endpoint endpoint, Processor processor) { + public SchedulerConsumer(SchedulerEndpoint endpoint, Processor processor) { super(endpoint, processor); } @Override + public SchedulerEndpoint getEndpoint() { + return (SchedulerEndpoint) super.getEndpoint(); + } + + @Override protected int poll() throws Exception { - Exchange exchange = getEndpoint().createExchange(); - try { - getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); + return sendTimerExchange(); + } + + protected int sendTimerExchange() { + final Exchange exchange = getEndpoint().createExchange(); + exchange.setProperty(Exchange.TIMER_NAME, getEndpoint().getName()); + + Date now = new Date(); + exchange.setProperty(Exchange.TIMER_FIRED_TIME, now); + + if (log.isTraceEnabled()) { + log.trace("Timer {} is firing", getEndpoint().getName()); } - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + if (!getEndpoint().isSynchronous()) { + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // handle any thrown exception + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } + }); + } else { + try { + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + + // handle any thrown exception + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } } // a property can be used to control if the scheduler polled a message or not @@ -47,4 +80,5 @@ public class SchedulerConsumer extends ScheduledPollConsumer { return polled ? 1 : 0; } + }