Repository: camel Updated Branches: refs/heads/master 6eab973bd -> 31b1a73d5
CS-fixes Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/31b1a73d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/31b1a73d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/31b1a73d Branch: refs/heads/master Commit: 31b1a73d5e0d02b79861fc0b9d21c961d93c62f6 Parents: add3d66 Author: Arno Noordover <anoordo...@users.noreply.github.com> Authored: Sat Jun 4 18:03:26 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jun 5 09:30:50 2016 +0200 ---------------------------------------------------------------------- .../redis/AbstractRedisProcessorCreator.java | 56 ++++ .../redis/AllRedisProcessorsCreator.java | 44 ++++ .../redis/ConnectionRedisProcessorsCreator.java | 34 +++ .../redis/HashesRedisProcessorsCreator.java | 54 ++++ .../redis/KeysRedisProcessorsCreator.java | 53 ++++ .../redis/ListsRedisProcessorsCreator.java | 70 +++++ .../redis/PubSubRedisProcessorsCreator.java | 34 +++ .../camel/component/redis/RedisClient.java | 2 - .../camel/component/redis/RedisEndpoint.java | 6 +- .../component/redis/RedisProcessorsCreator.java | 25 ++ .../camel/component/redis/RedisProducer.java | 256 +------------------ .../redis/SetsRedisProcessorsCreator.java | 59 +++++ .../redis/SortedSetsRedisProcessorsCreator.java | 73 ++++++ .../redis/StringsRedisProcessorsCreator.java | 66 +++++ .../TransactionRedisProcessorsCreator.java | 35 +++ 15 files changed, 610 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AbstractRedisProcessorCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AbstractRedisProcessorCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AbstractRedisProcessorCreator.java new file mode 100644 index 0000000..3a5f8d9 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AbstractRedisProcessorCreator.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; + +public abstract class AbstractRedisProcessorCreator implements RedisProcessorsCreator { + protected Map<Command, Processor> result = new HashMap<>(); + + @Override + public Map<Command, Processor> getRedisProcessors() { + return result; + } + + protected void bind(Command command, Processor processor) { + result.put(command, processor); + } + + private void setResult(Exchange exchange, Object result) { + Message message; + if (exchange.getPattern().isOutCapable()) { + message = exchange.getOut(); + message.copyFrom(exchange.getIn()); + } else { + message = exchange.getIn(); + } + message.setBody(result); + } + + protected Processor wrap(Function<Exchange, Object> supplier) { + return exchange -> { + Object result = supplier.apply(exchange); + setResult(exchange, result); + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AllRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AllRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AllRedisProcessorsCreator.java new file mode 100644 index 0000000..261124d --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/AllRedisProcessorsCreator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.Processor; + +public class AllRedisProcessorsCreator implements RedisProcessorsCreator { + + private final Map<Command, Processor> processors = new HashMap<>(); + + public AllRedisProcessorsCreator(RedisClient redisClient, ExchangeConverter exchangeConverter) { + this.processors.putAll(new ConnectionRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new HashesRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new KeysRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new ListsRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new PubSubRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new SetsRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new SortedSetsRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new StringsRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + this.processors.putAll(new TransactionRedisProcessorsCreator().getProcessors(redisClient, exchangeConverter)); + } + + public Map<Command, Processor> getRedisProcessors() { + return processors; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ConnectionRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ConnectionRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ConnectionRedisProcessorsCreator.java new file mode 100644 index 0000000..dc17750 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ConnectionRedisProcessorsCreator.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public class ConnectionRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.ECHO, wrap(exchange -> redisClient.echo(exchangeConverter.getStringValue(exchange)))); + bind(Command.PING, wrap(exchange -> redisClient.ping())); + bind(Command.QUIT, exchange -> redisClient.quit()); + + return result; + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/HashesRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/HashesRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/HashesRedisProcessorsCreator.java new file mode 100644 index 0000000..c9731cd --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/HashesRedisProcessorsCreator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class HashesRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.HDEL, exchange -> redisClient.hdel(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange))); + bind(Command.HEXISTS, wrap(exchange -> redisClient.hexists(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange)))); + bind(Command.HGET, wrap(exchange -> redisClient.hget(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange)))); + bind(Command.HGETALL, wrap(exchange -> redisClient.hgetAll(exchangeConverter.getKey(exchange)))); + bind(Command.HINCRBY, wrap(exchange -> redisClient.hincrBy(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValueAsLong(exchange)))); + bind(Command.HKEYS, wrap(exchange -> redisClient.hkeys(exchangeConverter.getKey(exchange)))); + bind(Command.HLEN, wrap(exchange -> redisClient.hlen(exchangeConverter.getKey(exchange)))); + bind(Command.HMGET, wrap(exchange -> redisClient.hmget(exchangeConverter.getKey(exchange), + exchangeConverter.getFields(exchange)))); + bind(Command.HMSET, exchange -> redisClient.hmset(exchangeConverter.getKey(exchange), + exchangeConverter.getValuesAsMap(exchange))); + bind(Command.HSET, exchange -> redisClient.hset(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValue(exchange))); + bind(Command.HSETNX, wrap(exchange -> redisClient.hsetnx(exchangeConverter.getKey(exchange), + exchangeConverter.getField(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.HVALS, wrap(exchange -> redisClient.hvals(exchangeConverter.getKey(exchange)))); + //missing: hincrbyfloat, hstrlen, hscan + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/KeysRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/KeysRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/KeysRedisProcessorsCreator.java new file mode 100644 index 0000000..5e91ffc --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/KeysRedisProcessorsCreator.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class KeysRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.DEL, exchange -> redisClient.del(exchangeConverter.getKeys(exchange))); + bind(Command.EXISTS, wrap(exchange -> redisClient.exists(exchangeConverter.getKey(exchange)))); + bind(Command.EXPIRE, wrap(exchange -> redisClient.expire(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.EXPIREAT, wrap(exchange -> redisClient.expireat(exchangeConverter.getKey(exchange), + exchangeConverter.getTimestamp(exchange)))); + bind(Command.KEYS, wrap(exchange -> redisClient.keys(exchangeConverter.getPattern(exchange)))); + bind(Command.MOVE, wrap(exchange -> redisClient.move(exchangeConverter.getKey(exchange), + exchangeConverter.getDb(exchange)))); + bind(Command.PERSIST, wrap(exchange -> redisClient.persist(exchangeConverter.getKey(exchange)))); + bind(Command.PEXPIRE, wrap(exchange -> redisClient.pexpire(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.PEXPIREAT, wrap(exchange -> redisClient.pexpireat(exchangeConverter.getKey(exchange), + exchangeConverter.getTimestamp(exchange)))); + bind(Command.RANDOMKEY, wrap(exchange -> redisClient.randomkey())); + bind(Command.RENAME, exchange -> redisClient.rename(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange))); + bind(Command.RENAMENX, wrap(exchange -> redisClient.renamenx(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange)))); + bind(Command.SORT, wrap(exchange -> redisClient.sort(exchangeConverter.getKey(exchange)))); + bind(Command.TTL, wrap(exchange -> redisClient.ttl(exchangeConverter.getKey(exchange)))); + bind(Command.TYPE, wrap(exchange -> redisClient.type(exchangeConverter.getKey(exchange)))); + //missing: dump, migrate, object, pttl, restore, wait, scan + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ListsRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ListsRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ListsRedisProcessorsCreator.java new file mode 100644 index 0000000..332bace --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/ListsRedisProcessorsCreator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class ListsRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.BLPOP, wrap(exchange -> redisClient.blpop(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.BRPOP, wrap(exchange -> redisClient.brpop(exchangeConverter.getKey(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.BRPOPLPUSH, wrap(exchange -> redisClient.brpoplpush(exchangeConverter.getKey(exchange), + exchangeConverter.getDestination(exchange), + exchangeConverter.getTimeout(exchange)))); + bind(Command.LINDEX, wrap(exchange -> redisClient.lindex(exchangeConverter.getKey(exchange), + exchangeConverter.getIndex(exchange)))); + bind(Command.LINSERT, wrap(exchange -> redisClient.linsert(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getPivot(exchange), + exchangeConverter.getPosition(exchange)))); + bind(Command.LLEN, wrap(exchange -> redisClient.llen(exchangeConverter.getKey(exchange)))); + bind(Command.LPOP, wrap(exchange -> redisClient.lpop(exchangeConverter.getKey(exchange)))); + bind(Command.LPUSH, wrap(exchange -> redisClient.lpush(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + //nieuwe actie + bind(Command.LPUSHX, wrap(exchange -> redisClient.lpushx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.LRANGE, wrap(exchange -> redisClient.lrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange)))); + bind(Command.LREM, wrap(exchange -> redisClient.lrem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getCount(exchange)))); + bind(Command.LSET, exchange -> redisClient.lset(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getIndex(exchange))); + bind(Command.LTRIM, exchange -> redisClient.ltrim(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.RPOP, wrap(exchange -> + redisClient.rpop(exchangeConverter.getKey(exchange)))); + bind(Command.RPOPLPUSH, wrap(exchange -> redisClient.rpoplpush(exchangeConverter.getKey(exchange), + exchangeConverter.getDestination(exchange)))); + bind(Command.RPUSH, wrap(exchange -> redisClient.rpush(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.RPUSHX, wrap(exchange -> redisClient.rpushx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/PubSubRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/PubSubRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/PubSubRedisProcessorsCreator.java new file mode 100644 index 0000000..4fc2394 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/PubSubRedisProcessorsCreator.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class PubSubRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.PUBLISH, exchange -> redisClient.publish(exchangeConverter.getChannel(exchange), + exchangeConverter.getMessage(exchange))); + //missing psubscribe, pubsub, punsubscribe, subscribe, unsubscribe + //psubscribe, subscribe are used in consumer + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java index 5a06e42..4e2b2d0 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisClient.java @@ -23,8 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.springframework.dao.DataAccessException; -import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.query.SortQuery; http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java index aaa2d33..3d6833e 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisEndpoint.java @@ -31,10 +31,13 @@ public class RedisEndpoint extends DefaultEndpoint { @UriParam private RedisConfiguration configuration; + private RedisProcessorsCreator redisProcessorsCreator; public RedisEndpoint(String uri, RedisComponent component, RedisConfiguration configuration) { super(uri, component); this.configuration = configuration; + redisProcessorsCreator = new AllRedisProcessorsCreator(new RedisClient(configuration.getRedisTemplate()), + ((RedisComponent)getComponent()).getExchangeConverter()); } public Producer createProducer() throws Exception { @@ -43,10 +46,9 @@ public class RedisEndpoint extends DefaultEndpoint { defaultCommand = Command.SET; } return new RedisProducer(this, - new RedisClient(configuration.getRedisTemplate()), RedisConstants.COMMAND, defaultCommand.name(), - ((RedisComponent)getComponent()).getExchangeConverter()); + redisProcessorsCreator); } public Consumer createConsumer(Processor processor) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProcessorsCreator.java new file mode 100644 index 0000000..a783075 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProcessorsCreator.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public interface RedisProcessorsCreator { + Map<Command, Processor> getRedisProcessors(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java index 990963a..b52bf90 100755 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisProducer.java @@ -16,273 +16,23 @@ */ package org.apache.camel.component.redis; -import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.HeaderSelectorProducer; final class RedisProducer extends HeaderSelectorProducer { - private final Map<String, Processor> processors = new HashMap<>(); RedisProducer(Endpoint endpoint, - RedisClient redisClient, String header, String defaultHeaderValue, - ExchangeConverter exchangeConverter) { + RedisProcessorsCreator redisProcessorsCreator) { super(endpoint, header, defaultHeaderValue); - //bind key commands - bind(Command.APPEND, wrap(exchange -> redisClient.append(exchangeConverter.getKey(exchange), - exchangeConverter.getStringValue(exchange)))); - bind(Command.DECR, wrap(exchange -> redisClient.decr(exchangeConverter.getKey(exchange)))); - bind(Command.DECRBY, wrap(exchange -> redisClient.decrby(exchangeConverter.getKey(exchange), - exchangeConverter.getLongValue(exchange)))); - bind(Command.GET, wrap(exchange -> redisClient.get(exchangeConverter.getKey(exchange)))); - bind(Command.GETBIT, wrap(exchange -> redisClient.getbit(exchangeConverter.getKey(exchange), - exchangeConverter.getOffset(exchange)))); - bind(Command.GETRANGE, wrap(exchange -> redisClient.getrange(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange)))); - bind(Command.GETSET, wrap(exchange -> redisClient.getset(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.INCR, wrap(exchange -> redisClient.incr(exchangeConverter.getKey(exchange)))); - bind(Command.INCRBY, wrap(exchange -> redisClient.incrby(exchangeConverter.getKey(exchange), - exchangeConverter.getLongValue(exchange)))); - bind(Command.MGET, wrap(exchange -> redisClient.mget(exchangeConverter.getFields(exchange)))); - bind(Command.MSET, exchange -> redisClient.mset(exchangeConverter.getValuesAsMap(exchange))); - bind(Command.MSETNX, exchange -> redisClient.msetnx(exchangeConverter.getValuesAsMap(exchange))); - bind(Command.SET, exchange -> redisClient.set(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange))); - bind(Command.SETBIT, exchange -> redisClient.setbit(exchangeConverter.getKey(exchange), - exchangeConverter.getOffset(exchange), - exchangeConverter.getBooleanValue(exchange))); - bind(Command.SETEX, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getTimeout(exchange), - TimeUnit.SECONDS)); - bind(Command.SETNX, wrap(exchange -> redisClient.setnx(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.SETRANGE, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getOffset(exchange))); - bind(Command.STRLEN, wrap(exchange -> redisClient.strlen(exchangeConverter.getKey(exchange)))); - //missing bitcount, bitfield, bitop, bitpos, incrbyfloat, psetex - //bind sorted set commands - bind(Command.ZADD, wrap(exchange -> redisClient.zadd(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getScore(exchange)))); - bind(Command.ZCARD, wrap(exchange -> redisClient.zcard(exchangeConverter.getKey(exchange)))); - bind(Command.ZCOUNT, wrap(exchange -> redisClient.zcount(exchangeConverter.getKey(exchange), - exchangeConverter.getMin(exchange), - exchangeConverter.getMax(exchange)))); - bind(Command.ZINCRBY, wrap(exchange -> redisClient.zincrby(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getIncrement(exchange)))); - bind(Command.ZINTERSTORE, exchange -> redisClient.zinterstore(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange), - exchangeConverter.getDestination(exchange))); - bind(Command.ZRANGE, wrap(exchange -> redisClient.zrange(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange), - exchangeConverter.getWithScore(exchange)))); - bind(Command.ZRANGEBYSCORE, wrap(exchange -> redisClient.zrangebyscore(exchangeConverter.getKey(exchange), - exchangeConverter.getMin(exchange), - exchangeConverter.getMax(exchange)))); - bind(Command.ZRANK, wrap(exchange -> redisClient.zrank(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.ZREM, wrap(exchange -> redisClient.zrem(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.ZREMRANGEBYRANK, exchange -> redisClient.zremrangebyrank(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange))); - bind(Command.ZREMRANGEBYSCORE, exchange -> redisClient.zremrangebyscore(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange))); - bind(Command.ZREVRANGE, wrap(exchange -> redisClient.zrevrange(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange), - exchangeConverter.getWithScore(exchange)))); - bind(Command.ZREVRANGEBYSCORE, wrap(exchange -> redisClient.zrevrangebyscore(exchangeConverter.getKey(exchange), - exchangeConverter.getMin(exchange), - exchangeConverter.getMax(exchange)))); - bind(Command.ZREVRANK, wrap(exchange -> redisClient.zrevrank(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.ZUNIONSTORE, exchange -> redisClient.zunionstore(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange), - exchangeConverter.getDestination(exchange))); - //missing zlexcount, zrangebylex, zrevrangebylex, zremrangebylex, zscore, zscan - - //bind sets commands - bind(Command.SADD, wrap(exchange -> redisClient.sadd(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.SCARD, wrap(exchange -> redisClient.scard(exchangeConverter.getKey(exchange)))); - bind(Command.SDIFF, wrap(exchange -> redisClient.sdiff(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange)))); - bind(Command.SDIFFSTORE, exchange -> redisClient.sdiffstore(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange), - exchangeConverter.getDestination(exchange))); - bind(Command.SINTER, wrap(exchange -> redisClient.sinter(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange)))); - bind(Command.SINTERSTORE, exchange -> redisClient.sinterstore(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange), - exchangeConverter.getDestination(exchange))); - bind(Command.SISMEMBER, wrap(exchange -> redisClient.sismember(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.SMEMBERS, wrap(exchange -> redisClient.smembers(exchangeConverter.getKey(exchange)))); - bind(Command.SMOVE, wrap(exchange -> redisClient.smove(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getDestination(exchange)))); - bind(Command.SPOP, wrap(exchange -> redisClient.spop(exchangeConverter.getKey(exchange)))); - bind(Command.SRANDMEMBER, wrap(exchange -> redisClient.srandmember(exchangeConverter.getKey(exchange)))); - bind(Command.SREM, wrap(exchange -> redisClient.srem(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.SUNION, wrap(exchange -> redisClient.sunion(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange)))); - bind(Command.SUNIONSTORE, exchange -> redisClient.sunionstore(exchangeConverter.getKey(exchange), - exchangeConverter.getKeys(exchange), - exchangeConverter.getDestination(exchange))); - //missing command sscan - - //bind pubsub commands - bind(Command.PUBLISH, exchange -> redisClient.publish(exchangeConverter.getChannel(exchange), - exchangeConverter.getMessage(exchange))); - //missing psubscribe, pubsub, punsubscribe, subscribe, unsubscribe - //psubscribe, subscribe are used in consumer - - //create list commands - bind(Command.BLPOP, wrap(exchange -> redisClient.blpop(exchangeConverter.getKey(exchange), - exchangeConverter.getTimeout(exchange)))); - bind(Command.BRPOP, wrap(exchange -> redisClient.brpop(exchangeConverter.getKey(exchange), - exchangeConverter.getTimeout(exchange)))); - bind(Command.BRPOPLPUSH, wrap(exchange -> redisClient.brpoplpush(exchangeConverter.getKey(exchange), - exchangeConverter.getDestination(exchange), - exchangeConverter.getTimeout(exchange)))); - bind(Command.LINDEX, wrap(exchange -> redisClient.lindex(exchangeConverter.getKey(exchange), - exchangeConverter.getIndex(exchange)))); - bind(Command.LINSERT, wrap(exchange -> redisClient.linsert(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getPivot(exchange), - exchangeConverter.getPosition(exchange)))); - bind(Command.LLEN, wrap(exchange -> redisClient.llen(exchangeConverter.getKey(exchange)))); - bind(Command.LPOP, wrap(exchange -> redisClient.lpop(exchangeConverter.getKey(exchange)))); - bind(Command.LPUSH, wrap(exchange -> redisClient.lpush(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - //nieuwe actie - bind(Command.LPUSHX, wrap(exchange -> redisClient.lpushx(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.LRANGE, wrap(exchange -> redisClient.lrange(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange)))); - bind(Command.LREM, wrap(exchange -> redisClient.lrem(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getCount(exchange)))); - bind(Command.LSET, exchange -> redisClient.lset(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange), - exchangeConverter.getIndex(exchange))); - bind(Command.LTRIM, exchange -> redisClient.ltrim(exchangeConverter.getKey(exchange), - exchangeConverter.getStart(exchange), - exchangeConverter.getEnd(exchange))); - bind(Command.RPOP, exchange -> setResult(exchange, - redisClient.rpop(exchangeConverter.getKey(exchange)))); - bind(Command.RPOPLPUSH, wrap(exchange -> redisClient.rpoplpush(exchangeConverter.getKey(exchange), - exchangeConverter.getDestination(exchange)))); - bind(Command.RPUSH, wrap(exchange -> redisClient.rpush(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.RPUSHX, wrap(exchange -> redisClient.rpushx(exchangeConverter.getKey(exchange), - exchangeConverter.getValue(exchange)))); - - //bind hashes commands - bind(Command.HDEL, exchange -> redisClient.hdel(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange))); - bind(Command.HEXISTS, wrap(exchange -> redisClient.hexists(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange)))); - bind(Command.HGET, wrap(exchange -> redisClient.hget(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange)))); - bind(Command.HGETALL, wrap(exchange -> redisClient.hgetAll(exchangeConverter.getKey(exchange)))); - bind(Command.HINCRBY, wrap(exchange -> redisClient.hincrBy(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange), - exchangeConverter.getValueAsLong(exchange)))); - bind(Command.HKEYS, wrap(exchange -> redisClient.hkeys(exchangeConverter.getKey(exchange)))); - bind(Command.HLEN, wrap(exchange -> redisClient.hlen(exchangeConverter.getKey(exchange)))); - bind(Command.HMGET, wrap(exchange -> redisClient.hmget(exchangeConverter.getKey(exchange), - exchangeConverter.getFields(exchange)))); - bind(Command.HMSET, exchange -> redisClient.hmset(exchangeConverter.getKey(exchange), - exchangeConverter.getValuesAsMap(exchange))); - bind(Command.HSET, exchange -> redisClient.hset(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange), - exchangeConverter.getValue(exchange))); - bind(Command.HSETNX, wrap(exchange -> redisClient.hsetnx(exchangeConverter.getKey(exchange), - exchangeConverter.getField(exchange), - exchangeConverter.getValue(exchange)))); - bind(Command.HVALS, wrap(exchange -> redisClient.hvals(exchangeConverter.getKey(exchange)))); - //missing: hincrbyfloat, hstrlen, hscan - - //bind connection commands - bind(Command.ECHO, wrap(exchange -> redisClient.echo(exchangeConverter.getStringValue(exchange)))); - bind(Command.PING, wrap(exchange -> redisClient.ping())); - bind(Command.QUIT, exchange -> redisClient.quit()); - - //bind key commands - bind(Command.DEL, exchange -> redisClient.del(exchangeConverter.getKeys(exchange))); - bind(Command.EXISTS, wrap(exchange -> redisClient.exists(exchangeConverter.getKey(exchange)))); - bind(Command.EXPIRE, wrap(exchange -> redisClient.expire(exchangeConverter.getKey(exchange), - exchangeConverter.getTimeout(exchange)))); - bind(Command.EXPIREAT, wrap(exchange -> redisClient.expireat(exchangeConverter.getKey(exchange), - exchangeConverter.getTimestamp(exchange)))); - bind(Command.KEYS, wrap(exchange -> redisClient.keys(exchangeConverter.getPattern(exchange)))); - bind(Command.MOVE, wrap(exchange -> redisClient.move(exchangeConverter.getKey(exchange), - exchangeConverter.getDb(exchange)))); - bind(Command.PERSIST, wrap(exchange -> redisClient.persist(exchangeConverter.getKey(exchange)))); - bind(Command.PEXPIRE, wrap(exchange -> redisClient.pexpire(exchangeConverter.getKey(exchange), - exchangeConverter.getTimeout(exchange)))); - bind(Command.PEXPIREAT, wrap(exchange -> redisClient.pexpireat(exchangeConverter.getKey(exchange), - exchangeConverter.getTimestamp(exchange)))); - bind(Command.RANDOMKEY, wrap(exchange -> redisClient.randomkey())); - bind(Command.RENAME, exchange -> redisClient.rename(exchangeConverter.getKey(exchange), - exchangeConverter.getStringValue(exchange))); - bind(Command.RENAMENX, wrap(exchange -> redisClient.renamenx(exchangeConverter.getKey(exchange), - exchangeConverter.getStringValue(exchange)))); - bind(Command.SORT, wrap(exchange -> redisClient.sort(exchangeConverter.getKey(exchange)))); - bind(Command.TTL, wrap(exchange -> redisClient.ttl(exchangeConverter.getKey(exchange)))); - bind(Command.TYPE, wrap(exchange -> redisClient.type(exchangeConverter.getKey(exchange)))); - //missing: dump, migrate, object, pttl, restore, wait, scan - - //bind transaction commands - bind(Command.DISCARD, exchange -> redisClient.discard()); - bind(Command.EXEC, exchange -> redisClient.exec()); - bind(Command.MULTI, exchange -> redisClient.multi()); - bind(Command.WATCH, exchange -> redisClient.watch(exchangeConverter.getKeys(exchange))); - bind(Command.UNWATCH, exchange -> redisClient.unwatch()); - } - - private void bind(Command command, Processor processor) { - String cmd = command.name(); - bind(cmd, processor); - } - - - private void setResult(Exchange exchange, Object result) { - Message message; - if (exchange.getPattern().isOutCapable()) { - message = exchange.getOut(); - message.copyFrom(exchange.getIn()); - } else { - message = exchange.getIn(); + for (Map.Entry<Command, Processor> entry : redisProcessorsCreator.getRedisProcessors().entrySet()) { + bind(entry.getKey().name(), entry.getValue()); } - message.setBody(result); - } - - public Processor wrap(Function<Exchange, Object> supplier) { - return exchange -> { - Object result = supplier.apply(exchange); - setResult(exchange, result); - }; } } http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SetsRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SetsRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SetsRedisProcessorsCreator.java new file mode 100644 index 0000000..da3bb2d --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SetsRedisProcessorsCreator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class SetsRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.SADD, wrap(exchange -> redisClient.sadd(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SCARD, wrap(exchange -> redisClient.scard(exchangeConverter.getKey(exchange)))); + bind(Command.SDIFF, wrap(exchange -> redisClient.sdiff(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SDIFFSTORE, exchange -> redisClient.sdiffstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.SINTER, wrap(exchange -> redisClient.sinter(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SINTERSTORE, exchange -> redisClient.sinterstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.SISMEMBER, wrap(exchange -> redisClient.sismember(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SMEMBERS, wrap(exchange -> redisClient.smembers(exchangeConverter.getKey(exchange)))); + bind(Command.SMOVE, wrap(exchange -> redisClient.smove(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getDestination(exchange)))); + bind(Command.SPOP, wrap(exchange -> redisClient.spop(exchangeConverter.getKey(exchange)))); + bind(Command.SRANDMEMBER, wrap(exchange -> redisClient.srandmember(exchangeConverter.getKey(exchange)))); + bind(Command.SREM, wrap(exchange -> redisClient.srem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SUNION, wrap(exchange -> redisClient.sunion(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange)))); + bind(Command.SUNIONSTORE, exchange -> redisClient.sunionstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + //missing command sscan + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SortedSetsRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SortedSetsRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SortedSetsRedisProcessorsCreator.java new file mode 100644 index 0000000..c648047 --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/SortedSetsRedisProcessorsCreator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class SortedSetsRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.ZADD, wrap(exchange -> redisClient.zadd(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getScore(exchange)))); + bind(Command.ZCARD, wrap(exchange -> redisClient.zcard(exchangeConverter.getKey(exchange)))); + bind(Command.ZCOUNT, wrap(exchange -> redisClient.zcount(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZINCRBY, wrap(exchange -> redisClient.zincrby(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getIncrement(exchange)))); + bind(Command.ZINTERSTORE, exchange -> redisClient.zinterstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + bind(Command.ZRANGE, wrap(exchange -> redisClient.zrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange), + exchangeConverter.getWithScore(exchange)))); + bind(Command.ZRANGEBYSCORE, wrap(exchange -> redisClient.zrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZRANK, wrap(exchange -> redisClient.zrank(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZREM, wrap(exchange -> redisClient.zrem(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZREMRANGEBYRANK, exchange -> redisClient.zremrangebyrank(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.ZREMRANGEBYSCORE, exchange -> redisClient.zremrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange))); + bind(Command.ZREVRANGE, wrap(exchange -> redisClient.zrevrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange), + exchangeConverter.getWithScore(exchange)))); + bind(Command.ZREVRANGEBYSCORE, wrap(exchange -> redisClient.zrevrangebyscore(exchangeConverter.getKey(exchange), + exchangeConverter.getMin(exchange), + exchangeConverter.getMax(exchange)))); + bind(Command.ZREVRANK, wrap(exchange -> redisClient.zrevrank(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.ZUNIONSTORE, exchange -> redisClient.zunionstore(exchangeConverter.getKey(exchange), + exchangeConverter.getKeys(exchange), + exchangeConverter.getDestination(exchange))); + //missing zlexcount, zrangebylex, zrevrangebylex, zremrangebylex, zscore, zscan + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/StringsRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/StringsRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/StringsRedisProcessorsCreator.java new file mode 100644 index 0000000..ddc7c5f --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/StringsRedisProcessorsCreator.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Processor; + +public final class StringsRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.APPEND, wrap(exchange -> redisClient.append(exchangeConverter.getKey(exchange), + exchangeConverter.getStringValue(exchange)))); + bind(Command.DECR, wrap(exchange -> redisClient.decr(exchangeConverter.getKey(exchange)))); + bind(Command.DECRBY, wrap(exchange -> redisClient.decrby(exchangeConverter.getKey(exchange), + exchangeConverter.getLongValue(exchange)))); + bind(Command.GET, wrap(exchange -> redisClient.get(exchangeConverter.getKey(exchange)))); + bind(Command.GETBIT, wrap(exchange -> redisClient.getbit(exchangeConverter.getKey(exchange), + exchangeConverter.getOffset(exchange)))); + bind(Command.GETRANGE, wrap(exchange -> redisClient.getrange(exchangeConverter.getKey(exchange), + exchangeConverter.getStart(exchange), + exchangeConverter.getEnd(exchange)))); + bind(Command.GETSET, wrap(exchange -> redisClient.getset(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.INCR, wrap(exchange -> redisClient.incr(exchangeConverter.getKey(exchange)))); + bind(Command.INCRBY, wrap(exchange -> redisClient.incrby(exchangeConverter.getKey(exchange), + exchangeConverter.getLongValue(exchange)))); + bind(Command.MGET, wrap(exchange -> redisClient.mget(exchangeConverter.getFields(exchange)))); + bind(Command.MSET, exchange -> redisClient.mset(exchangeConverter.getValuesAsMap(exchange))); + bind(Command.MSETNX, exchange -> redisClient.msetnx(exchangeConverter.getValuesAsMap(exchange))); + bind(Command.SET, exchange -> redisClient.set(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange))); + bind(Command.SETBIT, exchange -> redisClient.setbit(exchangeConverter.getKey(exchange), + exchangeConverter.getOffset(exchange), + exchangeConverter.getBooleanValue(exchange))); + bind(Command.SETEX, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getTimeout(exchange), + TimeUnit.SECONDS)); + bind(Command.SETNX, wrap(exchange -> redisClient.setnx(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange)))); + bind(Command.SETRANGE, exchange -> redisClient.setex(exchangeConverter.getKey(exchange), + exchangeConverter.getValue(exchange), + exchangeConverter.getOffset(exchange))); + bind(Command.STRLEN, wrap(exchange -> redisClient.strlen(exchangeConverter.getKey(exchange)))); + //missing bitcount, bitfield, bitop, bitpos, incrbyfloat, psetex + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/31b1a73d/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/TransactionRedisProcessorsCreator.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/TransactionRedisProcessorsCreator.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/TransactionRedisProcessorsCreator.java new file mode 100644 index 0000000..529757c --- /dev/null +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/TransactionRedisProcessorsCreator.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis; + +import java.util.Map; + +import org.apache.camel.Processor; + +public final class TransactionRedisProcessorsCreator extends AbstractRedisProcessorCreator { + + Map<Command, Processor> getProcessors(RedisClient redisClient, ExchangeConverter exchangeConverter) { + bind(Command.DISCARD, exchange -> redisClient.discard()); + bind(Command.EXEC, exchange -> redisClient.exec()); + bind(Command.MULTI, exchange -> redisClient.multi()); + bind(Command.WATCH, exchange -> redisClient.watch(exchangeConverter.getKeys(exchange))); + bind(Command.UNWATCH, exchange -> redisClient.unwatch()); + + return result; + } + +}