Author: davsclaus Date: Thu May 19 11:56:23 2011 New Revision: 1124694 URL: http://svn.apache.org/viewvc?rev=1124694&view=rev Log: CAMEL-3935: Added hazelcast idempotent repository. Thanks to Claus Straube for patch.
Added: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepository.java camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepositoryTest.java Modified: camel/trunk/components/camel-hazelcast/pom.xml Modified: camel/trunk/components/camel-hazelcast/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/pom.xml?rev=1124694&r1=1124693&r2=1124694&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/pom.xml (original) +++ camel/trunk/components/camel-hazelcast/pom.xml Thu May 19 11:56:23 2011 @@ -32,7 +32,10 @@ <description>Camel HazelCast based work queue implementation</description> <properties> - <camel.osgi.export.pkg>org.apache.camel.component.hazelcast.*</camel.osgi.export.pkg> + <camel.osgi.export.pkg> + org.apache.camel.component.hazelcast.*;${camel.osgi.version}, + org.apache.camel.processor.idempotent.hazelcast.* + </camel.osgi.export.pkg> </properties> <dependencies> Added: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepository.java?rev=1124694&view=auto ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepository.java (added) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepository.java Thu May 19 11:56:23 2011 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.hazelcast; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.IMap; +import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.spi.IdempotentRepository; + +public class HazelcastIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { + + private String repositoryName; + private IMap<String, Object> repo; + + public HazelcastIdempotentRepository() { + this(HazelcastIdempotentRepository.class.getSimpleName()); + } + + public HazelcastIdempotentRepository(String repositoryName) { + this.repositoryName = repositoryName; + } + + @Override + protected void doStart() throws Exception { + repo = Hazelcast.getMap(repositoryName); + } + + @Override + protected void doStop() throws Exception { + // noop + } + + @Override + public boolean add(String key) { + if (this.contains(key)) { + return false; + } else { + this.repo.put(key, false); + return true; + } + } + + @Override + public boolean confirm(String key) { + if (this.contains(key)) { + this.repo.put(key, true); + return true; + } else { + return false; + } + } + + @Override + public boolean contains(String key) { + return this.repo.containsKey(key); + } + + @Override + public boolean remove(String key) { + if (this.contains(key)) { + this.repo.remove(key); + return true; + } else { + return false; + } + } + + public String getRepositoryName() { + return repositoryName; + } + +} Added: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepositoryTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepositoryTest.java?rev=1124694&view=auto ============================================================================== --- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepositoryTest.java (added) +++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/processor/idempotent/hazelcast/HazelcastIdempotentRepositoryTest.java Thu May 19 11:56:23 2011 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent.hazelcast; + +import java.util.Iterator; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.IMap; +import com.hazelcast.core.Instance; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class HazelcastIdempotentRepositoryTest extends CamelTestSupport { + + private IMap<String, Boolean> cache = Hazelcast.getMap("myRepo"); + private HazelcastIdempotentRepository repo; + + private String key01 = "123"; + private String key02 = "456"; + + public void setUp() throws Exception { + repo = new HazelcastIdempotentRepository("myRepo"); + super.setUp(); + cache.clear(); + } + + public void tearDown() throws Exception { + super.tearDown(); + cache.clear(); + } + + @Test + public void testAdd() throws Exception { + // add first key + assertTrue(repo.add(key01)); + assertTrue(cache.containsKey(key01)); + + // try to add the same key again + assertFalse(repo.add(key01)); + assertEquals(1, cache.size()); + + // try to add an other one + assertTrue(repo.add(key02)); + assertEquals(2, cache.size()); + } + + @Test + public void testConfirm() throws Exception { + // add first key and confirm + assertTrue(repo.add(key01)); + assertTrue(repo.confirm(key01)); + + // try to confirm a key that isn't there + assertFalse(repo.confirm(key02)); + } + + @Test + public void testContains() throws Exception { + assertFalse(repo.contains(key01)); + + // add key and check again + assertTrue(repo.add(key01)); + assertTrue(repo.contains(key01)); + + } + + @Test + public void testRemove() throws Exception { + // add key to remove + assertTrue(repo.add(key01)); + assertEquals(1, cache.size()); + + // remove key + assertTrue(repo.remove(key01)); + assertEquals(0, cache.size()); + + // try to remove a key that isn't there + assertFalse(repo.remove(key02)); + } + + @Test + public void testRepositoryInRoute() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:out"); + mock.expectedBodiesReceived("a", "b"); + // c is a duplicate + + // should be started + assertEquals("Should be started", true, repo.getStatus().isStarted()); + + // send 3 message with one duplicated key (key01) + template.sendBodyAndHeader("direct://in", "a", "messageId", key01); + template.sendBodyAndHeader("direct://in", "b", "messageId", key02); + template.sendBodyAndHeader("direct://in", "c", "messageId", key01); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct://in") + .idempotentConsumer(header("messageId"), repo) + .to("mock://out"); + } + }; + } + +}