Polling Consumer
Camel supports implementing the Polling Consumer from the EIP patterns using the PollingConsumer interface which can be created via the Endpoint.createPollingConsumer() method.
![]()
So in your Java code you can do
Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();
The ConsumerTemplate (discussed below) is also available.
There are 3 main polling methods on PollingConsumer
Method name |
Description |
receive() |
Waits until a message is available and then returns it; potentially blocking forever |
receive(long) |
Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available |
receiveNoWait() |
Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet |
ConsumerTemplate
The ConsumerTemplate is a template much like Spring's JmsTemplate or JdbcTemplate supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint.
The template supports the 3 operations above, but also including convenient methods for returning the body, etc consumeBody.
The example from above using ConsumerTemplate is:
Exchange exchange = consumerTemplate.receive("activemq:my.queue");
Or to extract and get the body you can do:
Object body = consumerTemplate.receiveBody("activemq:my.queue");
And you can provide the body type as a parameter and have it returned as the type:
String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);
You get hold of a ConsumerTemplate from the CamelContext with the createConsumerTemplate operation:
ConsumerTemplate consumer = context.createConsumerTemplate();
Using ConsumerTemplate with Spring DSL
With the Spring DSL we can declare the consumer in the CamelContext with the consumerTemplate tag, just like the ProducerTemplate. The example below illustrates this:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<template id="producer"/>
<consumerTemplate id="consumer"/>
<route>
<from uri="seda:foo"/>
<to id="result" uri="mock:result"/>
</route>
</camelContext>
Then we can get leverage Spring to inject the ConsumerTemplate in our java class. The code below is part of an unit test but it shows how the consumer and producer can work together.
@ContextConfiguration
public class SpringConsumerTemplateTest extends AbstractJUnit38SpringContextTests {
@Autowired
private ProducerTemplate producer;
@Autowired
private ConsumerTemplate consumer;
@EndpointInject(ref = "result")
private MockEndpoint mock;
public void testConsumeTemplate() throws Exception {
mock.expectedBodiesReceived("Hello World");
producer.sendBody("seda:start", "Hello World");
String body = consumer.receiveBody("seda:start", String.class);
assertEquals("Hello World", body);
producer.sendBody("seda:foo", body);
mock.assertIsSatisfied();
}
}
Timer based polling consumer
In this sample we use a Timer to schedule a route to be started every 5th second and invoke our bean MyCoolBean where we implement the business logic for the Polling Consumer. Here we want to consume all messages from a JMS queue, process the message and send them to the next queue.
First we setup our route as:
MyCoolBean cool = new MyCoolBean();
cool.setProducer(template);
cool.setConsumer(consumer);
from("timer:).bean(cool, "someBusinessLogic");
from("activemq:queue.foo").to("mock:result");
And then we have out logic in our bean:
public static class MyCoolBean {
private int count;
private ConsumerTemplate consumer;
private ProducerTemplate producer;
public void setConsumer(ConsumerTemplate consumer) {
this.consumer = consumer;
}
public void setProducer(ProducerTemplate producer) {
this.producer = producer;
}
public void someBusinessLogic() {
while (true) {
String msg = consumer.receiveBody("activemq:queue.inbox", 3000, String.class);
if (msg == null) {
break;
}
msg = "Hello " + msg;
producer.sendBodyAndHeader("activemq:queue.foo", msg, "number", count++);
}
}
}
Scheduled Poll Components
Quite a few inbound Camel endpoints use a scheduled poll pattern to receive messages and push them through the Camel processing routes. That is to say externally from the client the endpoint appears to use an Event Driven Consumer but internally a scheduled poll is used to monitor some kind of state or resource and then fire message exchanges.
Since this a such a common pattern, polling components can extend the ScheduledPollConsumer base class which makes it simpler to implement this pattern.
There is also the Quartz Component which provides scheduled delivery of messages using the Quartz enterprise scheduler.
For more details see:
ScheduledPollConsumer Options
The ScheduledPollConsumer supports the following options:
Option |
Default |
Description |
pollStrategy |
|
A pluggable org.apache.camel.PollingConsumerPollStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. In other words the error occurred while the polling was gathering information, for instance access to a file network failed so Camel cannot access it to scan for files. The default implementation will log the caused exception at WARN level and ignore it. |
sendEmptyMessageWhenIdle |
false |
Camel 2.9: If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. |
startScheduler |
true |
Whether the scheduler should be auto started. |
initialDelay |
1000 |
Milliseconds before polling the file/directory starts. |
delay |
500 |
Milliseconds before the next poll of the file/directory. |
useFixedDelay |
|
Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. In Camel 2.7.x or older the default value is false. From Camel 2.8 onwards the default value is true. |
timeUnit |
TimeUnit.MILLISECONDS |
time unit for initialDelay and delay options. |
runLoggingLevel |
TRACE |
Camel 2.8: The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. |
About error handling and scheduled polling consumers
ScheduledPollConsumer is scheduled based and its run method is invoked periodically based on schedule settings. But errors can also occur when a poll is being executed. For instance if Camel should poll a file network, and this network resource is not available then a java.io.IOException could occur. As this error happens before any Exchange has been created and prepared for routing, then the regular Error handling in Camel does not apply. So what does the consumer do then? Well the exception is propagated back to the run method where its handled. Camel will by default log the exception at WARN level and then ignore it. At next schedule the error could have been resolved and thus being able to poll the endpoint successfully.
Controlling the error handling using PollingConsumerPollStrategy
org.apache.camel.PollingConsumerPollStrategy is a pluggable strategy that you can configure on the ScheduledPollConsumer. The default implementation org.apache.camel.impl.DefaultPollingConsumerPollStrategy will log the caused exception at WARN level and then ignore this issue.
The strategy interface provides the following 3 methods
- begin
- void begin(Consumer consumer, Endpoint endpoint)
- begin (Camel 2.3)
- boolean begin(Consumer consumer, Endpoint endpoint)
- commit
- void commit(Consumer consumer, Endpoint endpoint)
- commit (Camel 2.6)
- void commit(Consumer consumer, Endpoint endpoint, int polledMessages)
- rollback
- boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception
In Camel 2.3 onwards the begin method returns a boolean which indicates whether or not to skipping polling. So you can implement your custom logic and return false if you do not want to poll this time.
In Camel 2.6 onwards the commit method has an additional parameter containing the number of message that was actually polled. For example if there was no messages polled, the value would be zero, and you can react accordingly.
The most interesting is the rollback as it allows you do handle the caused exception and decide what to do.
For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy method and put the retry logic in the rollback method. Lets just retry up till 3 times:
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
if (retryCounter < 3) {
return true;
}
return false;
}
Notice that we are given the Consumer as a parameter. We could use this to restart the consumer as we can invoke stop and start:
consumer.stop();
consumer.start();
Notice: If you implement the begin operation make sure to avoid throwing exceptions as in such a case the poll operation is not invoked and Camel will invoke the rollback directly.
Configuring an Endpoint to use PollingConsumerPollStrategy
To configure an Endpoint to use a custom PollingConsumerPollStrategy you use the option pollStrategy. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll:
from("file:).to("activemq:queue:inbox")
Using This Pattern
If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.
See Also