Repository: camel Updated Branches: refs/heads/master 52cede016 -> bac9ddd4b
CAMEL-11475 - Camel-Caffeine: Add idempotent repository to component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/222f287b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/222f287b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/222f287b Branch: refs/heads/master Commit: 222f287b9d8deb01c65568a06847874aba3d6879 Parents: 52cede0 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Jun 30 10:37:32 2017 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Jun 30 10:52:33 2017 +0200 ---------------------------------------------------------------------- .../CaffeineIdempotentRepository.java | 97 +++++++++++++ .../CaffeineIdempotentRepositoryTest.java | 138 +++++++++++++++++++ 2 files changed, 235 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/222f287b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java new file mode 100644 index 0000000..b1f9880 --- /dev/null +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepository.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.caffeine.processor.idempotent; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +@ManagedResource(description = "Caffeine based message id repository") +public class CaffeineIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { + + private String cacheName; + private Cache<String, Boolean> cache; + + public CaffeineIdempotentRepository() { + this(CaffeineIdempotentRepository.class.getSimpleName()); + } + + public CaffeineIdempotentRepository(String repositoryName) { + this.cacheName = repositoryName; + } + + @ManagedAttribute(description = "The processor name") + public String getCacheName() { + return cacheName; + } + + @Override + @ManagedOperation(description = "Adds the key to the store") + public boolean add(String key) { + if (cache.asMap().containsKey(key)) { + return false; + } else { + cache.put(key, true); + return true; + } + } + + @Override + public boolean confirm(String key) { + return cache.asMap().containsKey(key); + } + + @Override + @ManagedOperation(description = "Does the store contain the given key") + public boolean contains(String key) { + return cache.asMap().containsKey(key); + } + + @Override + @ManagedOperation(description = "Remove the key from the store") + public boolean remove(String key) { + cache.invalidate(key); + return true; + } + + @Override + @ManagedOperation(description = "Clear the store") + public void clear() { + cache.invalidateAll(); + } + + @Override + protected void doStart() throws Exception { + if (cache == null) { + Caffeine<Object, Object> builder = Caffeine.newBuilder(); + cache = builder.build(); + } + } + + protected Cache getCache() { + return this.cache; + } + + @Override + protected void doStop() throws Exception { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/222f287b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java new file mode 100644 index 0000000..e2eef89 --- /dev/null +++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/idempotent/CaffeineIdempotentRepositoryTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.caffeine.processor.idempotent; + +import java.util.UUID; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import com.github.benmanes.caffeine.cache.Cache; + +public class CaffeineIdempotentRepositoryTest extends CamelTestSupport { + + private CaffeineIdempotentRepository repo; + private Cache<String, Boolean> cache; + private String key01; + private String key02; + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + repo = new CaffeineIdempotentRepository("test"); + + key01 = generateRandomString(); + key02 = generateRandomString(); + } + + @Test + public void testAdd() throws Exception { + // add first key + assertTrue(repo.add(key01)); + assertTrue(repo.getCache().asMap().containsKey(key01)); + + // try to add the same key again + assertFalse(repo.add(key01)); + + // try to add an other one + assertTrue(repo.add(key02)); + assertTrue(repo.getCache().asMap().containsKey(key02)); + } + + @Test + public void testConfirm() throws Exception { + // add first key and confirm + assertTrue(repo.add(key01)); + assertTrue(repo.confirm(key01)); + + // try to confirm a key that isn't there + assertFalse(repo.confirm(key02)); + } + + @Test + public void testContains() throws Exception { + assertFalse(repo.contains(key01)); + + // add key and check again + assertTrue(repo.add(key01)); + assertTrue(repo.contains(key01)); + + } + + @Test + public void testRemove() throws Exception { + // add key to remove + assertTrue(repo.add(key01)); + assertTrue(repo.add(key02)); + assertTrue(repo.getCache().asMap().containsKey(key01)); + assertTrue(repo.getCache().asMap().containsKey(key02)); + + // clear repo + repo.clear(); + assertFalse(repo.getCache().asMap().containsKey(key01)); + assertFalse(repo.getCache().asMap().containsKey(key02)); + } + + @Test + public void testClear() throws Exception { + // add key to remove + assertTrue(repo.add(key01)); + assertTrue(repo.confirm(key01)); + + // remove key + assertTrue(repo.remove(key01)); + assertFalse(repo.confirm(key01)); + + // try to remove a key that isn't there + repo.remove(key02); + } + + @Test + public void testRepositoryInRoute() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:out"); + mock.expectedBodiesReceived("a", "b"); + // c is a duplicate + + // should be started + assertEquals("Should be started", true, repo.getStatus().isStarted()); + + // send 3 message with one duplicated key (key01) + template.sendBodyAndHeader("direct://in", "a", "messageId", key01); + template.sendBodyAndHeader("direct://in", "b", "messageId", key02); + template.sendBodyAndHeader("direct://in", "c", "messageId", key01); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct://in") + .idempotentConsumer(header("messageId"), repo) + .to("mock://out"); + } + }; + } + + protected static String generateRandomString() { + return UUID.randomUUID().toString(); + } +}