ADDED Redis String based idempotent repository to support expiration, refs https://issues.apache.org/jira/browse/CAMEL-9023
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cadb1bda Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cadb1bda Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cadb1bda Branch: refs/heads/master Commit: cadb1bda94f2f4db026122716a4bcfcde2a5aa8c Parents: e84b46d Author: Marco Zapletal <[email protected]> Authored: Thu Sep 24 16:52:16 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Fri Sep 25 11:21:49 2015 +0200 ---------------------------------------------------------------------- .../RedisStringIdempotentRepository.java | 91 ++++++++++++++++++++ parent/pom.xml | 2 +- 2 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cadb1bda/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisStringIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisStringIdempotentRepository.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisStringIdempotentRepository.java new file mode 100644 index 0000000..04e585a --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/processor/idempotent/RedisStringIdempotentRepository.java @@ -0,0 +1,91 @@ +package org.apache.camel.component.redis.processor.idempotent; + +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.ValueOperations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class RedisStringIdempotentRepository extends RedisIdempotentRepository { + + private final ValueOperations<String, String> valueOperations; + + /* + The expiry time frame for the item in seconds + */ + private Long expiry; + + public RedisStringIdempotentRepository( + RedisTemplate<String, String> redisTemplate, + String processorName) { + super(redisTemplate, processorName); + this.valueOperations = redisTemplate.opsForValue(); + } + + @Override + public boolean contains(String key) { + String value = valueOperations.get(createRedisKey(key)); + if (value != null) { + return true; + } else { + return false; + } + } + + @Override + public boolean add(String key) { + boolean added = valueOperations.setIfAbsent(createRedisKey(key), key); + if (expiry != null) { + valueOperations.getOperations().expire(createRedisKey(key), expiry, TimeUnit.SECONDS); + } + return added; + } + + @Override + public boolean remove(String key) { + valueOperations.getOperations().delete(createRedisKey(key)); + return true; + } + + public void clear() { + valueOperations.getOperations().execute(new RedisCallback<List<byte[]>>() { + @Override + public List<byte[]> doInRedis(RedisConnection connection) throws DataAccessException { + List<byte[]> binaryKeys = new ArrayList<>(); + Cursor<byte[]> + cursor = + connection.scan(ScanOptions.scanOptions().match("*" + createRedisKey("*")).build()); + + while (cursor.hasNext()) { + byte[] key = cursor.next(); + binaryKeys.add(key); + } + if (binaryKeys.size() > 0) { + connection.del(binaryKeys.toArray(new byte[][]{})); + } + return binaryKeys; + } + }); + } + + public String createRedisKey(String key) { + return new StringBuilder(getProcessorName()).append(":").append(key).toString(); + } + + public Long getExpiry() { + return expiry; + } + + /** + * Exire all newly added items after the given number of seconds + */ + public void setExpiry(Long expiry) { + this.expiry = expiry; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/cadb1bda/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 5e99df9..cc1763a 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -456,7 +456,7 @@ <spring-boot-version>1.2.6.RELEASE</spring-boot-version> <spring-castor-bundle-version>1.2.0</spring-castor-bundle-version> <spring-data-commons-version>1.6.5.RELEASE</spring-data-commons-version> - <spring-data-redis-version>1.3.6.RELEASE</spring-data-redis-version> + <spring-data-redis-version>1.6.0.RELEASE</spring-data-redis-version> <spring-integration-version>2.2.6.RELEASE</spring-integration-version> <spring-javaconfig-version>1.0.0-20090215</spring-javaconfig-version> <spring-ldap-version>2.0.3.RELEASE</spring-ldap-version>
