Idempotent ConsumerPage edited by Claus IbsenChanges (1)
Full ContentIdempotent ConsumerThe Idempotent Consumer from the EIP patterns is used to filter out duplicate messages. This pattern is implemented using the IdempotentConsumer class. This uses an _expression_ to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository. The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates. Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress. Camel provides the following Idempotent Consumer implementations:
OptionsThe Idempotent Consumer has the following options:
Using the Fluent BuildersThe following example will use the header myMessageId to filter out duplicates RouteBuilder builder = new RouteBuilder() { public void configure() { errorHandler(deadLetterChannel("mock:error")); from("seda:a") .idempotentConsumer(header("myMessageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)) .to("seda:b"); } }; The above example will use an in-memory based MessageIdRepository which can easily run out of memory and doesn't work in a clustered environment. So you might prefer to use the JPA based implementation which uses a database to store the message IDs which have been processed from("direct:start").idempotentConsumer( header("messageId"), jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME) ).to("mock:result"); In the above example we are using the header messageId to filter out duplicates and using the collection myProcessorName to indicate the Message ID Repository to use. This name is important as you could process the same message by many different processors; so each may require its own logical Message ID Repository. For further examples of this pattern in use you could look at the junit test case Spring XML exampleThe following example will use the header myMessageId to filter out duplicates <!-- repository for the idempotent consumer --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <idempotentConsumer messageIdRepositoryRef="myRepo"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- if not a duplicate send it to this mock endpoint --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext> How to handle duplicate messages in the routeAvailable as of Camel 2.8 You can now set the skipDuplicate option to false which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages. For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message. Filter duplicate messages from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result"); The sample example in XML DSL would be: Filter duplicate messages <!-- idempotent repository, just use a memory based for testing --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- we do not want to skip any duplicate messages --> <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> <!-- we will to handle duplicate messages using a filter --> <filter> <!-- the filter will only react on duplicate messages, if this property is set on the Exchange --> <property>CamelDuplicateMessage</property> <!-- and send the message to this mock, due its part of an unit test --> <!-- but you can of course do anything as its part of the route --> <to uri="mock:duplicate"/> <!-- and then stop --> <stop/> </filter> <!-- here we route only new messages --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext> How to handle duplicate message in a clustered environment with a data gridAvailable as of Camel 2.8 If you have running Camel in a clustered environment, a in memory idempotent repository doesn't work (see above). You can setup either a central database or use the idempotent consumer implementation based on the Hazelcast data grid. Hazelcast finds the nodes over multicast (which is default - configure Hazelcast for tcp-ip) and creates automatically a map based repository: HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out"); You have to define how long the repository should hold each message id (default is to delete it never). To avoid that you run out of memory you should create an eviction strategy based on the Hazelcast configuration. For additional information see camel-hazelcast. See this little tutorial, how setup such an idempotent repository on two cluster nodes using Apache Karaf. Using This PatternIf you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
- [CONF] Apache Camel > Idempotent Consumer confluence
- [CONF] Apache Camel > Idempotent Consumer confluence
- [CONF] Apache Camel > Idempotent Consumer confluence