Simplefied camel-spring-redis
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3232a4a4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3232a4a4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3232a4a4 Branch: refs/heads/master Commit: 3232a4a4df0bdc92f062456a1a4bd4f8df347bbe Parents: 6eab973 Author: Arno Noordover <anoordo...@users.noreply.github.com> Authored: Sat Jun 4 14:10:07 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jun 5 09:30:50 2016 +0200 ---------------------------------------------------------------------- .../component/redis/CommandDispatcher.java | 483 ------------------- .../component/redis/ExchangeConverter.java | 145 ++++++ .../camel/component/redis/RedisClient.java | 43 +- .../camel/component/redis/RedisComponent.java | 6 + .../camel/component/redis/RedisConsumer.java | 2 +- .../camel/component/redis/RedisEndpoint.java | 10 +- .../camel/component/redis/RedisProducer.java | 288 +++++++++-- .../camel/component/redis/RedisHashTest.java | 10 +- .../camel/component/redis/RedisKeyTest.java | 6 +- .../camel/component/redis/RedisListTest.java | 2 +- .../camel/component/redis/RedisSetTest.java | 22 +- .../component/redis/RedisSortedSetTest.java | 14 +- .../camel/component/redis/RedisStringTest.java | 8 +- .../component/redis/RedisTransactionTest.java | 2 +- 14 files changed, 470 insertions(+), 571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java deleted file mode 100755 index 0b56ba5..0000000 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/CommandDispatcher.java +++ /dev/null @@ -1,483 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.redis; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.RuntimeExchangeException; - -class CommandDispatcher { - private final RedisConfiguration configuration; - - CommandDispatcher(RedisConfiguration configuration) { - this.configuration = configuration; - } - - // CHECKSTYLE:OFF - public void execute(final RedisClient redisClient, Exchange exchange) { - final Command command = determineCommand(exchange); - switch (command) { - - case PING: - setResult(exchange, redisClient.ping()); - break; - case SET: - redisClient.set(getKey(exchange), getValue(exchange)); - break; - case GET: - setResult(exchange, redisClient.get(getKey(exchange))); - break; - case QUIT: - redisClient.quit(); - break; - case EXISTS: - setResult(exchange, redisClient.exists(getKey(exchange))); - break; - case DEL: - redisClient.del(getKeys(exchange)); - break; - case TYPE: - setResult(exchange, redisClient.type(getKey(exchange))); - break; - case KEYS: - setResult(exchange, redisClient.keys(getPattern(exchange))); - break; - case RANDOMKEY: - setResult(exchange, redisClient.randomkey()); - break; - case RENAME: - redisClient.rename(getKey(exchange), getStringValue(exchange)); - break; - case RENAMENX: - setResult(exchange, redisClient.renamenx(getKey(exchange), getStringValue(exchange))); - break; - case EXPIRE: - setResult(exchange, redisClient.expire(getKey(exchange), getTimeout(exchange))); - break; - case EXPIREAT: - setResult(exchange, redisClient.expireat(getKey(exchange), getTimestamp(exchange))); - break; - case PEXPIRE: - setResult(exchange, redisClient.pexpire(getKey(exchange), getTimeout(exchange))); - break; - case PEXPIREAT: - setResult(exchange, redisClient.pexpireat(getKey(exchange), getTimestamp(exchange))); - break; - case TTL: - setResult(exchange, redisClient.ttl(getKey(exchange))); - break; - case MOVE: - setResult(exchange, redisClient.move(getKey(exchange), getDb(exchange))); - break; - case GETSET: - setResult(exchange, redisClient.getset(getKey(exchange), getValue(exchange))); - break; - case MGET: - setResult(exchange, redisClient.mget(getFields(exchange))); - break; - case SETNX: - setResult(exchange, redisClient.setnx(getKey(exchange), getValue(exchange))); - break; - case SETEX: - redisClient.setex(getKey(exchange), getValue(exchange), getTimeout(exchange), TimeUnit.SECONDS); - break; - case MSET: - redisClient.mset(getValuesAsMap(exchange)); - break; - case MSETNX: - redisClient.msetnx(getValuesAsMap(exchange)); - break; - case DECRBY: - setResult(exchange, redisClient.decrby(getKey(exchange), getLongValue(exchange))); - break; - case DECR: - setResult(exchange, redisClient.decr(getKey(exchange))); - break; - case INCRBY: - setResult(exchange, redisClient.incrby(getKey(exchange), getLongValue(exchange))); - break; - case INCR: - setResult(exchange, redisClient.incr(getKey(exchange))); - break; - case APPEND: - setResult(exchange, redisClient.append(getKey(exchange), getStringValue(exchange))); - break; - case HSET: - redisClient.hset(getKey(exchange), getField(exchange), getValue(exchange)); - break; - case HGET: - setResult(exchange, redisClient.hget(getKey(exchange), getField(exchange))); - break; - case HSETNX: - setResult(exchange, redisClient.hsetnx(getKey(exchange), getField(exchange), getValue(exchange))); - break; - case HMSET: - redisClient.hmset(getKey(exchange), getValuesAsMap(exchange)); - break; - case HMGET: - setResult(exchange, redisClient.hmget(getKey(exchange), getFields(exchange))); - break; - case HINCRBY: - setResult(exchange, redisClient.hincrBy(getKey(exchange), getField(exchange), getValueAsLong(exchange))); - break; - case HEXISTS: - setResult(exchange, redisClient.hexists(getKey(exchange), getField(exchange))); - break; - case HDEL: - redisClient.hdel(getKey(exchange), getField(exchange)); - break; - case HLEN: - setResult(exchange, redisClient.hlen(getKey(exchange))); - break; - case HKEYS: - setResult(exchange, redisClient.hkeys(getKey(exchange))); - break; - case HVALS: - setResult(exchange, redisClient.hvals(getKey(exchange))); - break; - case HGETALL: - setResult(exchange, redisClient.hgetAll(getKey(exchange))); - break; - case RPUSH: - setResult(exchange, redisClient.rpush(getKey(exchange), getValue(exchange))); - break; - case LPUSH: - setResult(exchange, redisClient.lpush(getKey(exchange), getValue(exchange))); - break; - case LLEN: - setResult(exchange, redisClient.llen(getKey(exchange))); - break; - case LRANGE: - setResult(exchange, redisClient.lrange(getKey(exchange), getStart(exchange), getEnd(exchange))); - break; - case LTRIM: - redisClient.ltrim(getKey(exchange), getStart(exchange), getEnd(exchange)); - break; - case LINDEX: - setResult(exchange, redisClient.lindex(getKey(exchange), getIndex(exchange))); - break; - case LSET: - redisClient.lset(getKey(exchange), getValue(exchange), getIndex(exchange)); - break; - case LREM: - setResult(exchange, redisClient.lrem(getKey(exchange), getValue(exchange), getCount(exchange))); - break; - case LPOP: - setResult(exchange, redisClient.lpop(getKey(exchange))); - break; - case RPOP: - setResult(exchange, redisClient.rpop(getKey(exchange))); - break; - case RPOPLPUSH: - setResult(exchange, redisClient.rpoplpush(getKey(exchange), getDestination(exchange))); - break; - case SADD: - setResult(exchange, redisClient.sadd(getKey(exchange), getValue(exchange))); - break; - case SMEMBERS: - setResult(exchange, redisClient.smembers(getKey(exchange))); - break; - case SREM: - setResult(exchange, redisClient.srem(getKey(exchange), getValue(exchange))); - break; - case SPOP: - setResult(exchange, redisClient.spop(getKey(exchange))); - break; - case SMOVE: - setResult(exchange, redisClient.smove(getKey(exchange), getValue(exchange), getDestination(exchange))); - break; - case SCARD: - setResult(exchange, redisClient.scard(getKey(exchange))); - break; - case SISMEMBER: - setResult(exchange, redisClient.sismember(getKey(exchange), getValue(exchange))); - break; - case SINTER: - setResult(exchange, redisClient.sinter(getKey(exchange), getKeys(exchange))); - break; - case SINTERSTORE: - redisClient.sinterstore(getKey(exchange), getKeys(exchange), getDestination(exchange)); - break; - case SUNION: - setResult(exchange, redisClient.sunion(getKey(exchange), getKeys(exchange))); - break; - case SUNIONSTORE: - redisClient.sunionstore(getKey(exchange), getKeys(exchange), getDestination(exchange)); - break; - case SDIFF: - setResult(exchange, redisClient.sdiff(getKey(exchange), getKeys(exchange))); - break; - case SDIFFSTORE: - redisClient.sdiffstore(getKey(exchange), getKeys(exchange), getDestination(exchange)); - break; - case SRANDMEMBER: - setResult(exchange, redisClient.srandmember(getKey(exchange))); - break; - case ZADD: - setResult(exchange, redisClient.zadd(getKey(exchange), getValue(exchange), getScore(exchange))); - break; - case ZRANGE: - setResult(exchange, redisClient.zrange(getKey(exchange), getStart(exchange), getEnd(exchange), getWithScore(exchange))); - break; - case ZREM: - setResult(exchange, redisClient.zrem(getKey(exchange), getValue(exchange))); - break; - case ZINCRBY: - setResult(exchange, redisClient.zincrby(getKey(exchange), getValue(exchange), getIncrement(exchange))); - break; - case ZRANK: - setResult(exchange, redisClient.zrank(getKey(exchange), getValue(exchange))); - break; - case ZREVRANK: - setResult(exchange, redisClient.zrevrank(getKey(exchange), getValue(exchange))); - break; - case ZREVRANGE: - setResult(exchange, redisClient.zrevrange(getKey(exchange), getStart(exchange), getEnd(exchange), getWithScore(exchange))); - break; - case ZCARD: - setResult(exchange, redisClient.zcard(getKey(exchange))); - break; - case MULTI: - redisClient.multi(); - break; - case DISCARD: - redisClient.discard(); - break; - case EXEC: - redisClient.exec(); - break; - case WATCH: - redisClient.watch(getKeys(exchange)); - break; - case UNWATCH: - redisClient.unwatch(); - break; - case SORT: - setResult(exchange, redisClient.sort(getKey(exchange))); - break; - case BLPOP: - setResult(exchange, redisClient.blpop(getKey(exchange), getTimeout(exchange))); - break; - case BRPOP: - setResult(exchange, redisClient.brpop(getKey(exchange), getTimeout(exchange))); - break; - case PUBLISH: - redisClient.publish(getChannel(exchange), getMessage(exchange)); - break; - case ZCOUNT: - setResult(exchange, redisClient.zcount(getKey(exchange), getMin(exchange), getMax(exchange))); - break; - case ZRANGEBYSCORE: - setResult(exchange, redisClient.zrangebyscore(getKey(exchange), getMin(exchange), getMax(exchange))); - break; - case ZREVRANGEBYSCORE: - setResult(exchange, redisClient.zrevrangebyscore(getKey(exchange), getMin(exchange), getMax(exchange))); - break; - case ZREMRANGEBYRANK: - redisClient.zremrangebyrank(getKey(exchange), getStart(exchange), getEnd(exchange)); - break; - case ZREMRANGEBYSCORE: - redisClient.zremrangebyscore(getKey(exchange), getStart(exchange), getEnd(exchange)); - break; - case ZUNIONSTORE: - redisClient.zunionstore(getKey(exchange), getKeys(exchange), getDestination(exchange)); - break; - case ZINTERSTORE: - redisClient.zinterstore(getKey(exchange), getKeys(exchange), getDestination(exchange)); - break; - case STRLEN: - setResult(exchange, redisClient.strlen(getKey(exchange))); - break; - case PERSIST: - setResult(exchange, redisClient.persist(getKey(exchange))); - break; - case RPUSHX: - setResult(exchange, redisClient.rpushx(getKey(exchange), getValue(exchange))); - break; - case ECHO: - setResult(exchange, redisClient.echo(getStringValue(exchange))); - break; - case LINSERT: - setResult(exchange, redisClient.linsert(getKey(exchange), getValue(exchange), getPivot(exchange), getPosition(exchange))); - break; - case BRPOPLPUSH: - setResult(exchange, redisClient.brpoplpush(getKey(exchange), getDestination(exchange), getTimeout(exchange))); - break; - case SETBIT: - redisClient.setbit(getKey(exchange), getOffset(exchange), getBooleanValue(exchange)); - break; - case GETBIT: - setResult(exchange, redisClient.getbit(getKey(exchange), getOffset(exchange))); - break; - case SETRANGE: - redisClient.setex(getKey(exchange), getValue(exchange), getOffset(exchange)); - break; - case GETRANGE: - setResult(exchange, redisClient.getrange(getKey(exchange), getStart(exchange), getEnd(exchange))); - break; - default: - throw new RuntimeExchangeException("Unsupported command: " + command, exchange); - } - } - // CHECKSTYLE:ON - - private Command determineCommand(Exchange exchange) { - Command command = exchange.getIn().getHeader(RedisConstants.COMMAND, Command.class); - if (command == null) { - command = configuration.getCommand(); - } - if (command == null) { - command = Command.SET; - } - return command; - } - - private static <T> T getInHeaderValue(Exchange exchange, String key, Class<T> aClass) { - return exchange.getIn().getHeader(key, aClass); - } - - private void setResult(Exchange exchange, Object result) { - Message message; - if (exchange.getPattern().isOutCapable()) { - message = exchange.getOut(); - message.copyFrom(exchange.getIn()); - } else { - message = exchange.getIn(); - } - message.setBody(result); - } - - public String getDestination(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.DESTINATION, String.class); - } - - private String getChannel(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.CHANNEL, String.class); - } - - private Object getMessage(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.MESSAGE, Object.class); - } - - public Long getIndex(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.INDEX, Long.class); - } - - public String getPivot(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.PIVOT, String.class); - } - - public String getPosition(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.POSITION, String.class); - } - - public Long getCount(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.COUNT, Long.class); - } - - private Long getStart(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.START, Long.class); - } - - private Long getEnd(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.END, Long.class); - } - - private Long getTimeout(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.TIMEOUT, Long.class); - } - - private Long getOffset(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.OFFSET, Long.class); - } - - private Long getValueAsLong(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class); - } - - private Collection<String> getFields(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.FIELDS, Collection.class); - } - - private Map<String, Object> getValuesAsMap(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUES, Map.class); - } - - private String getKey(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.KEY, String.class); - } - - public Collection<String> getKeys(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.KEYS, Collection.class); - } - - private Object getValue(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUE, Object.class); - } - - private String getStringValue(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUE, String.class); - } - - private Long getLongValue(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class); - } - - private Boolean getBooleanValue(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.VALUE, Boolean.class); - } - - private String getField(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.FIELD, String.class); - } - - public Long getTimestamp(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.TIMESTAMP, Long.class); - } - - public String getPattern(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.PATTERN, String.class); - } - - public Integer getDb(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.DB, Integer.class); - } - - public Double getScore(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.SCORE, Double.class); - } - - public Double getMin(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.MIN, Double.class); - } - - public Double getMax(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.MAX, Double.class); - } - - public Double getIncrement(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.INCREMENT, Double.class); - } - - public Boolean getWithScore(Exchange exchange) { - return getInHeaderValue(exchange, RedisConstants.WITHSCORE, Boolean.class); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java new file mode 100644 index 0000000..cdae518 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ExchangeConverter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Collection; +import java.util.Map; + +import org.apache.camel.Exchange; + +class ExchangeConverter { + String getKey(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.KEY, String.class); + } + + String getStringValue(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUE, String.class); + } + + Long getLongValue(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class); + } + + String getDestination(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.DESTINATION, String.class); + } + + String getChannel(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.CHANNEL, String.class); + } + + Object getMessage(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.MESSAGE, Object.class); + } + + Long getIndex(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.INDEX, Long.class); + } + + String getPivot(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.PIVOT, String.class); + } + + String getPosition(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.POSITION, String.class); + } + + Long getCount(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.COUNT, Long.class); + } + + Long getStart(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.START, Long.class); + } + + Long getEnd(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.END, Long.class); + } + + Long getTimeout(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.TIMEOUT, Long.class); + } + + Long getOffset(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.OFFSET, Long.class); + } + + Long getValueAsLong(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUE, Long.class); + } + + Collection<String> getFields(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.FIELDS, Collection.class); + } + + Map<String, Object> getValuesAsMap(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUES, Map.class); + } + + Collection<String> getKeys(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.KEYS, Collection.class); + } + + Object getValue(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUE, Object.class); + } + + Boolean getBooleanValue(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.VALUE, Boolean.class); + } + + String getField(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.FIELD, String.class); + } + + Long getTimestamp(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.TIMESTAMP, Long.class); + } + + String getPattern(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.PATTERN, String.class); + } + + Integer getDb(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.DB, Integer.class); + } + + Double getScore(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.SCORE, Double.class); + } + + Double getMin(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.MIN, Double.class); + } + + Double getMax(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.MAX, Double.class); + } + + Double getIncrement(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.INCREMENT, Double.class); + } + + Boolean getWithScore(Exchange exchange) { + return getInHeaderValue(exchange, RedisConstants.WITHSCORE, Boolean.class); + } + + private static <T> T getInHeaderValue(Exchange exchange, String key, Class<T> aClass) { + return exchange.getIn().getHeader(key, aClass); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java index 3613d6a..5a06e42 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java @@ -82,12 +82,9 @@ public class RedisClient { } public void quit() { - redisTemplate.execute(new RedisCallback<Object>() { - @Override - public Object doInRedis(RedisConnection connection) throws DataAccessException { - connection.close(); - return null; - } + redisTemplate.execute((RedisCallback<Object>) connection -> { + connection.close(); + return null; }); } @@ -156,21 +153,15 @@ public class RedisClient { } public void setbit(final String key, final Long offset, final Boolean value) { - redisTemplate.execute(new RedisCallback<Object>() { - @Override - public Object doInRedis(RedisConnection connection) throws DataAccessException { - connection.setBit(key.getBytes(), offset, value); - return null; - } + redisTemplate.execute((RedisCallback<Object>) connection -> { + connection.setBit(key.getBytes(), offset, value); + return null; }); } public Boolean getbit(final String key, final Long offset) { - return redisTemplate.execute(new RedisCallback<Boolean>() { - @Override - public Boolean doInRedis(RedisConnection connection) throws DataAccessException { - return connection.getBit(key.getBytes(), offset); - } + return redisTemplate.execute((RedisCallback<Boolean>) connection -> { + return connection.getBit(key.getBytes(), offset); }); } @@ -255,20 +246,14 @@ public class RedisClient { } public String echo(final String value) { - return redisTemplate.execute(new RedisCallback<String>() { - @Override - public String doInRedis(RedisConnection connection) throws DataAccessException { - return new String(connection.echo(value.getBytes())); - } + return redisTemplate.execute((RedisCallback<String>) connection -> { + return new String(connection.echo(value.getBytes())); }); } public String ping() { - return redisTemplate.execute(new RedisCallback<String>() { - @Override - public String doInRedis(RedisConnection connection) throws DataAccessException { - return connection.ping(); - } + return redisTemplate.execute((RedisCallback<String>) connection -> { + return connection.ping(); }); } @@ -346,6 +331,10 @@ public class RedisClient { return redisTemplate.opsForList().leftPush(key, value); } + public Long lpushx(String key, Object value) { + return redisTemplate.opsForList().leftPushIfPresent(key, value); + } + public void del(Collection<String> keys) { redisTemplate.delete(keys); } http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java index 568dda3..dca4fd7 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisComponent.java @@ -26,6 +26,8 @@ import org.apache.camel.impl.UriEndpointComponent; */ public class RedisComponent extends UriEndpointComponent { + private final ExchangeConverter exchangeConverter = new ExchangeConverter(); + public RedisComponent() { super(RedisEndpoint.class); } @@ -48,4 +50,8 @@ public class RedisComponent extends UriEndpointComponent { configuration.setPort(Integer.parseInt(hostAndPort[1])); } } + + public ExchangeConverter getExchangeConverter() { + return exchangeConverter; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java index 84d2338..e0fe095 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java @@ -48,7 +48,7 @@ public class RedisConsumer extends DefaultConsumer implements MessageListener { private Collection<Topic> toTopics(String channels) { String[] channelsArrays = channels.split(","); - List<Topic> topics = new ArrayList<Topic>(); + List<Topic> topics = new ArrayList<>(); for (String channel : channelsArrays) { String name = channel.trim(); if (Command.PSUBSCRIBE.equals(redisConfiguration.getCommand())) { http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java index d3a8294..aaa2d33 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java @@ -38,7 +38,15 @@ public class RedisEndpoint extends DefaultEndpoint { } public Producer createProducer() throws Exception { - return new RedisProducer(this, configuration); + Command defaultCommand = configuration.getCommand(); + if (defaultCommand == null) { + defaultCommand = Command.SET; + } + return new RedisProducer(this, + new RedisClient(configuration.getRedisTemplate()), + RedisConstants.COMMAND, + defaultCommand.name(), + ((RedisComponent)getComponent()).getExchangeConverter()); } public Consumer createConsumer(Processor processor) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java index bfadab5..3df1ccb 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,39 +16,273 @@ */ package org.apache.camel.component.redis; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.URISupport; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.impl.HeaderSelectorProducer; -/** - * The Redis producer. - */ -public class RedisProducer extends DefaultProducer { - private final RedisClient redisClient; - private final CommandDispatcher commandDispatcher; - - private transient String redisProducerToString; - - public RedisProducer(RedisEndpoint endpoint, RedisConfiguration configuration) { - super(endpoint); - this.redisClient = new RedisClient(configuration.getRedisTemplate()); - this.commandDispatcher = new CommandDispatcher(configuration); - } +final class RedisProducer extends HeaderSelectorProducer { + private final Map<String, Processor> processors = new HashMap<>(); + + public RedisProducer(Endpoint endpoint, + RedisClient redisClient, + String header, + String defaultHeaderValue, + ExchangeConverter exchangeConverter) { + super(endpoint, header, defaultHeaderValue); + //bind key commands + bind(Command.APPEND, wrap(exchange -> redisClient.append(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange)))); + bind(Command.DECR, wrap(exchange -> redisClient.decr(exchangeConverter.getKey(exchange)))); + bind(Command.DECRBY, wrap(exchange -> redisClient.decrby(exchangeConverter.getKey(exchange), + exchangeConverter.getLongValue(exchange)))); + bind(Command.GET, wrap(exchange -> redisClient.get(exchangeConverter.getKey(exchange)))); + bind(Command.GETBIT, wrap(exchange -> redisClient.getbit(exchangeConverter.getKey(exchange), + exchangeConverter.getOffset(exchange)))); + bind(Command.GETRANGE, wrap(exchange -> redisClient.getrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange)))); + bind(Command.GETSET, wrap(exchange -> redisClient.getset(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.INCR, wrap(exchange -> redisClient.incr(exchangeConverter.getKey(exchange)))); + bind(Command.INCRBY, wrap(exchange -> redisClient.incrby(exchangeConverter.getKey(exchange), + exchangeConverter.getLongValue(exchange)))); + bind(Command.MGET, wrap(exchange -> redisClient.mget(exchangeConverter.getFields(exchange)))); + bind(Command.MSET, exchange -> redisClient.mset(exchangeConverter.getValuesAsMap(exchange))); + bind(Command.MSETNX, exchange -> redisClient.msetnx(exchangeConverter.getValuesAsMap(exchange))); + bind(Command.SET, exchange -> redisClient.set(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange))); + bind(Command.SETBIT, exchange -> redisClient.setbit(exchangeConverter.getKey(exchange), + exchangeConverter.getOffset(exchange), + exchangeConverter.getBooleanValue(exchange))); + bind(Command.SETEX, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getTimeout(exchange), + TimeUnit.SECONDS)); + bind(Command.SETNX, wrap(exchange -> redisClient.setnx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SETRANGE, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getOffset(exchange))); + bind(Command.STRLEN, wrap(exchange -> redisClient.strlen(exchangeConverter.getKey(exchange)))); + //missing bitcount, bitfield, bitop, bitpos, incrbyfloat, psetex + + //bind sorted set commands + bind(Command.ZADD, wrap(exchange -> redisClient.zadd(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getScore(exchange)))); + bind(Command.ZCARD, wrap(exchange -> redisClient.zcard(exchangeConverter.getKey(exchange)))); + bind(Command.ZCOUNT, wrap(exchange -> redisClient.zcount(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZINCRBY, wrap(exchange -> redisClient.zincrby(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getIncrement(exchange)))); + bind(Command.ZINTERSTORE, exchange -> redisClient.zinterstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.ZRANGE, wrap(exchange -> redisClient.zrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange), + exchangeConverter.getWithScore(exchange)))); + bind(Command.ZRANGEBYSCORE, wrap(exchange -> redisClient.zrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZRANK, wrap(exchange -> redisClient.zrank(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZREM, wrap(exchange -> redisClient.zrem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZREMRANGEBYRANK, exchange -> redisClient.zremrangebyrank(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.ZREMRANGEBYSCORE, exchange -> redisClient.zremrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.ZREVRANGE, wrap(exchange -> redisClient.zrevrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange), + exchangeConverter.getWithScore(exchange)))); + bind(Command.ZREVRANGEBYSCORE, wrap(exchange -> redisClient.zrevrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZREVRANK, wrap(exchange -> redisClient.zrevrank(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZUNIONSTORE, exchange -> redisClient.zunionstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + //missing zlexcount, zrangebylex, zrevrangebylex, zremrangebylex, zscore, zscan + + //bind sets commands + bind(Command.SADD, wrap(exchange -> redisClient.sadd(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SCARD, wrap(exchange -> redisClient.scard(exchangeConverter.getKey(exchange)))); + bind(Command.SDIFF, wrap(exchange -> redisClient.sdiff(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SDIFFSTORE, exchange -> redisClient.sdiffstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.SINTER, wrap(exchange -> redisClient.sinter(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SINTERSTORE, exchange -> redisClient.sinterstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.SISMEMBER, wrap(exchange -> redisClient.sismember(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SMEMBERS, wrap(exchange -> redisClient.smembers(exchangeConverter.getKey(exchange)))); + bind(Command.SMOVE, wrap(exchange -> redisClient.smove(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getDestination(exchange)))); + bind(Command.SPOP, wrap(exchange -> redisClient.spop(exchangeConverter.getKey(exchange)))); + bind(Command.SRANDMEMBER, wrap(exchange -> redisClient.srandmember(exchangeConverter.getKey(exchange)))); + bind(Command.SREM, wrap(exchange -> redisClient.srem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SUNION, wrap(exchange -> redisClient.sunion(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SUNIONSTORE, exchange -> redisClient.sunionstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + //missing command sscan + + //bind pubsub commands + bind(Command.PUBLISH, exchange -> redisClient.publish(exchangeConverter.getChannel(exchange), + exchangeConverter.getMessage(exchange))); + //missing psubscribe, pubsub, punsubscribe, subscribe, unsubscribe + //psubscribe, subscribe are used in consumer + + //create list commands + bind(Command.BLPOP, wrap(exchange -> redisClient.blpop(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.BRPOP, wrap(exchange -> redisClient.brpop(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.BRPOPLPUSH, wrap(exchange -> redisClient.brpoplpush(exchangeConverter.getKey(exchange), + exchangeConverter.getDestination(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.LINDEX, wrap(exchange -> redisClient.lindex(exchangeConverter.getKey(exchange), + exchangeConverter.getIndex(exchange)))); + bind(Command.LINSERT, wrap(exchange -> redisClient.linsert(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getPivot(exchange), + exchangeConverter.getPosition(exchange)))); + bind(Command.LLEN, wrap(exchange -> redisClient.llen(exchangeConverter.getKey(exchange)))); + bind(Command.LPOP, wrap(exchange -> redisClient.lpop(exchangeConverter.getKey(exchange)))); + bind(Command.LPUSH, wrap(exchange -> redisClient.lpush(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + //nieuwe actie + bind(Command.LPUSHX, wrap(exchange -> redisClient.lpushx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.LRANGE, wrap(exchange -> redisClient.lrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange)))); + bind(Command.LREM, wrap(exchange -> redisClient.lrem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getCount(exchange)))); + bind(Command.LSET, exchange -> redisClient.lset(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getIndex(exchange))); + bind(Command.LTRIM, exchange -> redisClient.ltrim(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.RPOP, exchange -> setResult(exchange, + redisClient.rpop(exchangeConverter.getKey(exchange)))); + bind(Command.RPOPLPUSH, wrap(exchange -> redisClient.rpoplpush(exchangeConverter.getKey(exchange), + exchangeConverter.getDestination(exchange)))); + bind(Command.RPUSH, wrap(exchange -> redisClient.rpush(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.RPUSHX, wrap(exchange -> redisClient.rpushx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); - public void process(final Exchange exchange) throws Exception { - commandDispatcher.execute(redisClient, exchange); + //bind hashes commands + bind(Command.HDEL, exchange -> redisClient.hdel(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange))); + bind(Command.HEXISTS, wrap(exchange -> redisClient.hexists(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange)))); + bind(Command.HGET, wrap(exchange -> redisClient.hget(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange)))); + bind(Command.HGETALL, wrap(exchange -> redisClient.hgetAll(exchangeConverter.getKey(exchange)))); + bind(Command.HINCRBY, wrap(exchange -> redisClient.hincrBy(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValueAsLong(exchange)))); + bind(Command.HKEYS, wrap(exchange -> redisClient.hkeys(exchangeConverter.getKey(exchange)))); + bind(Command.HLEN, wrap(exchange -> redisClient.hlen(exchangeConverter.getKey(exchange)))); + bind(Command.HMGET, wrap(exchange -> redisClient.hmget(exchangeConverter.getKey(exchange), + exchangeConverter.getFields(exchange)))); + bind(Command.HMSET, exchange -> redisClient.hmset(exchangeConverter.getKey(exchange), + exchangeConverter.getValuesAsMap(exchange))); + bind(Command.HSET, exchange -> redisClient.hset(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValue(exchange))); + bind(Command.HSETNX, wrap(exchange -> redisClient.hsetnx(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.HVALS, wrap(exchange -> redisClient.hvals(exchangeConverter.getKey(exchange)))); + //missing: hincrbyfloat, hstrlen, hscan + + //bind connection commands + bind(Command.ECHO, wrap(exchange -> redisClient.echo(exchangeConverter.getStringValue(exchange)))); + bind(Command.PING, wrap(exchange -> redisClient.ping())); + bind(Command.QUIT, exchange -> redisClient.quit()); + + //bind key commands + bind(Command.DEL, exchange -> redisClient.del(exchangeConverter.getKeys(exchange))); + bind(Command.EXISTS, wrap(exchange -> redisClient.exists(exchangeConverter.getKey(exchange)))); + bind(Command.EXPIRE, wrap(exchange -> redisClient.expire(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.EXPIREAT, wrap(exchange -> redisClient.expireat(exchangeConverter.getKey(exchange), + exchangeConverter.getTimestamp(exchange)))); + bind(Command.KEYS, wrap(exchange -> redisClient.keys(exchangeConverter.getPattern(exchange)))); + bind(Command.MOVE, wrap(exchange -> redisClient.move(exchangeConverter.getKey(exchange), + exchangeConverter.getDb(exchange)))); + bind(Command.PERSIST, wrap(exchange -> redisClient.persist(exchangeConverter.getKey(exchange)))); + bind(Command.PEXPIRE, wrap(exchange -> redisClient.pexpire(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.PEXPIREAT, wrap(exchange -> redisClient.pexpireat(exchangeConverter.getKey(exchange), + exchangeConverter.getTimestamp(exchange)))); + bind(Command.RANDOMKEY, wrap(exchange -> redisClient.randomkey())); + bind(Command.RENAME, exchange -> redisClient.rename(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange))); + bind(Command.RENAMENX, wrap(exchange -> redisClient.renamenx(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange)))); + bind(Command.SORT, wrap(exchange -> redisClient.sort(exchangeConverter.getKey(exchange)))); + bind(Command.TTL, wrap(exchange -> redisClient.ttl(exchangeConverter.getKey(exchange)))); + bind(Command.TYPE, wrap(exchange -> redisClient.type(exchangeConverter.getKey(exchange)))); + //missing: dump, migrate, object, pttl, restore, wait, scan + + //bind transaction commands + bind(Command.DISCARD, exchange -> redisClient.discard()); + bind(Command.EXEC, exchange -> redisClient.exec()); + bind(Command.MULTI, exchange -> redisClient.multi()); + bind(Command.WATCH, exchange -> redisClient.watch(exchangeConverter.getKeys(exchange))); + bind(Command.UNWATCH, exchange -> redisClient.unwatch()); } - @Override - public RedisEndpoint getEndpoint() { - return (RedisEndpoint)super.getEndpoint(); + private void bind(Command command, Processor processor) { + String cmd = command.name(); + bind(cmd, processor); } - @Override - public String toString() { - if (redisProducerToString == null) { - redisProducerToString = "RedisProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; + + private void setResult(Exchange exchange, Object result) { + Message message; + if (exchange.getPattern().isOutCapable()) { + message = exchange.getOut(); + message.copyFrom(exchange.getIn()); + } else { + message = exchange.getIn(); } - return redisProducerToString; + message.setBody(result); } + + public Processor wrap(Function<Exchange, Object> supplier) { + return exchange -> { + Object result = supplier.apply(exchange); + setResult(exchange, result); + }; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java index 4d14205..da60087 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisHashTest.java @@ -96,7 +96,7 @@ public class RedisHashTest extends RedisTestSupport { @Test public void shouldExecuteHKEYS() throws Exception { - Set<String> fields = new HashSet<String>(Arrays.asList(new String[] {"field1, field2"})); + Set<String> fields = new HashSet<>(Arrays.asList(new String[]{"field1, field2"})); when(hashOperations.keys(anyString())).thenReturn(fields); Object result = sendHeaders( @@ -110,7 +110,7 @@ public class RedisHashTest extends RedisTestSupport { @Test public void shouldExecuteHMSET() throws Exception { - Map<String, String> values = new HashMap<String, String>(); + Map<String, String> values = new HashMap<>(); values.put("field1", "value1"); values.put("field2", "value"); @@ -124,7 +124,7 @@ public class RedisHashTest extends RedisTestSupport { @Test public void shouldExecuteHVALS() throws Exception { - List<String> values = new ArrayList<String>(); + List<String> values = new ArrayList<>(); values.add("val1"); values.add("val2"); @@ -192,7 +192,7 @@ public class RedisHashTest extends RedisTestSupport { @Test public void shouldExecuteHGETALL() throws Exception { - HashMap<String, String> values = new HashMap<String, String>(); + HashMap<String, String> values = new HashMap<>(); values.put("field1", "valu1"); when(hashOperations.entries(anyString())).thenReturn(values); @@ -206,7 +206,7 @@ public class RedisHashTest extends RedisTestSupport { @Test public void shouldExecuteHMGET() throws Exception { - List<String> fields = new ArrayList<String>(); + List<String> fields = new ArrayList<>(); fields.add("field1"); when(hashOperations.multiGet(anyString(), anyCollection())).thenReturn(fields); http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java index a8decce..98e301f 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisKeyTest.java @@ -57,7 +57,7 @@ public class RedisKeyTest extends RedisTestSupport { @Test public void shouldExecuteDEL() throws Exception { - Collection<String> keys = new HashSet<String>(); + Collection<String> keys = new HashSet<>(); keys.add("key1"); keys.add("key2"); sendHeaders( @@ -109,7 +109,7 @@ public class RedisKeyTest extends RedisTestSupport { @Test public void shouldExecuteKEYS() throws Exception { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key1"); keys.add("key2"); when(redisTemplate.keys(anyString())).thenReturn(keys); @@ -211,7 +211,7 @@ public class RedisKeyTest extends RedisTestSupport { @Test public void shouldExecuteSORT() throws Exception { - List<Integer> list = new ArrayList<Integer>(); + List<Integer> list = new ArrayList<>(); list.add(5); when(redisTemplate.sort(any(SortQuery.class))).thenReturn(list); http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java index 9ab6841..78bef4f 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisListTest.java @@ -204,7 +204,7 @@ public class RedisListTest extends RedisTestSupport { @Test public void shouldExecuteLRANGE() throws Exception { - List<String> values = new ArrayList<String>(); + List<String> values = new ArrayList<>(); values.add("value"); when(listOperations.range(anyString(), anyLong(), anyLong())) http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java index a528c08..3323d0f 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSetTest.java @@ -80,12 +80,12 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSDIFF() throws Exception { - Set<String> difference = new HashSet<String>(); + Set<String> difference = new HashSet<>(); difference.add("a"); difference.add("b"); when(setOperations.difference(anyString(), anyCollection())).thenReturn(difference); - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); Object result = sendHeaders( @@ -99,7 +99,7 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSDIFFSTORE() throws Exception { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); sendHeaders( @@ -113,12 +113,12 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSINTER() throws Exception { - Set<String> difference = new HashSet<String>(); + Set<String> difference = new HashSet<>(); difference.add("a"); difference.add("b"); when(setOperations.intersect(anyString(), anyCollection())).thenReturn(difference); - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); Object result = sendHeaders( @@ -132,7 +132,7 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSINTERSTORE() throws Exception { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); sendHeaders( @@ -159,7 +159,7 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSMEMBERS() throws Exception { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); @@ -220,18 +220,18 @@ public class RedisSetTest extends RedisTestSupport { RedisConstants.VALUE, "value"); verify(setOperations).remove("key", "value"); - assertEquals(Long.valueOf(1), result); + assertEquals(1L, result); } @Test public void shouldExecuteSUNION() throws Exception { - Set<String> resultKeys = new HashSet<String>(); + Set<String> resultKeys = new HashSet<>(); resultKeys.add("key2"); resultKeys.add("key3"); when(setOperations.union(anyString(), anyCollection())).thenReturn(resultKeys); - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key4"); @@ -246,7 +246,7 @@ public class RedisSetTest extends RedisTestSupport { @Test public void shouldExecuteSUNIONSTORE() throws Exception { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key4"); http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java index 5e209f8..01cb157 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisSortedSetTest.java @@ -110,7 +110,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZINTERSTORE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); sendHeaders( @@ -124,7 +124,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZRANGE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); when(zSetOperations.range(anyString(), anyLong(), anyLong())).thenReturn(keys); @@ -158,7 +158,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZRANGEBYSCORE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); when(zSetOperations.rangeByScore(anyString(), anyDouble(), anyDouble())).thenReturn(keys); @@ -197,7 +197,7 @@ public class RedisSortedSetTest extends RedisTestSupport { RedisConstants.VALUE, "value"); verify(zSetOperations).remove("key", "value"); - assertEquals(Long.valueOf(1), result); + assertEquals(1L, result); } @@ -226,7 +226,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZREVRANGE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); when(zSetOperations.reverseRange(anyString(), anyLong(), anyLong())).thenReturn(keys); @@ -260,7 +260,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZREVRANGEBYSCORE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); when(zSetOperations.reverseRangeByScore(anyString(), anyDouble(), anyDouble())).thenReturn(keys); @@ -291,7 +291,7 @@ public class RedisSortedSetTest extends RedisTestSupport { @Test public void shouldExecuteZUNIONSTORE() { - Set<String> keys = new HashSet<String>(); + Set<String> keys = new HashSet<>(); keys.add("key2"); keys.add("key3"); sendHeaders( http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java index a68c3e5..274fe31 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisStringTest.java @@ -231,10 +231,10 @@ public class RedisStringTest extends RedisTestSupport { @Test public void shouldExecuteMGET() throws Exception { - List<String> fields = new ArrayList<String>(); + List<String> fields = new ArrayList<>(); fields.add("field1"); - List<String> values = new ArrayList<String>(); + List<String> values = new ArrayList<>(); values.add("value1"); when(valueOperations.multiGet(fields)).thenReturn(values); @@ -250,7 +250,7 @@ public class RedisStringTest extends RedisTestSupport { @Test public void shouldExecuteMSET() throws Exception { - Map<String, String> values = new HashMap<String, String>(); + Map<String, String> values = new HashMap<>(); values.put("field1", "valu1"); sendHeaders( @@ -263,7 +263,7 @@ public class RedisStringTest extends RedisTestSupport { @Test public void shouldExecuteMSETNX() throws Exception { - Map<String, String> values = new HashMap<String, String>(); + Map<String, String> values = new HashMap<>(); values.put("field1", "valu1"); sendHeaders( http://git-wip-us.apache.org/repos/asf/camel/blob/3232a4a4/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java index b333563..64b1039 100755 --- a/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java +++ b/components/camel-spring-redis/src/test/java/org/apache/camel/component/redis/RedisTransactionTest.java @@ -69,7 +69,7 @@ public class RedisTransactionTest extends RedisTestSupport { @Test public void shouldExecuteWATCH() throws Exception { - List<String> keys = new ArrayList<String>(); + List<String> keys = new ArrayList<>(); keys.add("key"); sendHeaders(