...
So in your Java code you can do
Code Block |
Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();
|
...
Method name |
Description |
receive() |
Waits until a message is available and then returns it; potentially blocking forever |
receive(long) |
Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available |
receiveNoWait() |
Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet |
EventDrivenPollingConsumer Options
The EventDrivePollingConsumer (the default implementation) supports the following options:
Wiki Markup |
{div:class=confluenceTableSmall}
|| Option || Default || Description ||
| {{pollingConsumerQueueSize}} | {{1000}} | *Camel 2.14/2.13.1:* The queue size for the internal handoff queue between the polling consumer, and producers sending data into the queue. |
| {{pollingConsumerBlockWhenFull}} | {{true}} | *Camel 2.14/2.13.1:* Whether to block any producer if the internal queue is full. |
{div}
|
Notice that some Camel Components has their own implementation of PollingConsumer
and therefore do not support the options above.
You can configure these options in endpoints URIs, such as shown below:
Code Block |
Endpoint endpoint = context.getEndpoint("file:inbox?pollingConsumerQueueSize=50");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive(5000);
|
ConsumerTemplate
The ConsumerTemplate
is a template much like Spring's JmsTemplate or JdbcTemplate supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint.
The template supports the 3 operations above, but also including convenient methods for returning the body, etc consumeBody
.
The example from above using ConsumerTemplate is:
Code Block |
Exchange exchange = consumerTemplate.receive("activemq:my.queue");
|
Or to extract and get the body you can do:
Code Block |
Object body = consumerTemplate.receiveBody("activemq:my.queue");
|
And you can provide the body type as a parameter and have it returned as the type:
Code Block |
String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);
|
You get hold of a ConsumerTemplate
from the CamelContext
with the createConsumerTemplate
operation:
Code Block |
ConsumerTemplate consumer = context.createConsumerTemplate();
|
...
...
For example to let a FTP consumer backoff if its becoming idle for a while you can do:
Code Block |
from("ftp://myserver?username=foo&passowrd=secret?delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5")
.to("bean:processFile");
|
...
For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy
method and put the retry logic in the rollback
method. Lets just retry up till 3 times:
Code Block |
|
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
if (retryCounter < 3) {
// return true to tell Camel that it should retry the poll immediately
return true;
}
// okay we give up do not retry anymore
return false;
}
|
Notice that we are given the Consumer
as a parameter. We could use this to restart the consumer as we can invoke stop and start:
Code Block |
// error occurred lets restart the consumer, that could maybe resolve the issue
consumer.stop();
consumer.start();
|
...
To configure an Endpoint to use a custom PollingConsumerPollStrategy
you use the option pollStrategy
. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll
:
Code Block |
from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox")
|
Include Page |
|
See Also