Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 4230e03a1 -> cc3c939d2
# IGNITE-45 - Examples Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cc3c939d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cc3c939d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cc3c939d Branch: refs/heads/ignite-45 Commit: cc3c939d2978d54d254c5a0aa887529d63ef75c0 Parents: 4230e03 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Mar 21 21:03:35 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Mar 21 21:03:35 2015 -0700 ---------------------------------------------------------------------- .../streaming/numbers/StreamRandomNumbers.java | 9 +++---- .../streaming/numbers/StreamRandomNumbers.java | 2 +- .../apache/ignite/stream/StreamTransformer.java | 26 +++++++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java index efb5f28..f1e55d2 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java @@ -18,7 +18,6 @@ package org.apache.ignite.examples.streaming.numbers; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.examples.*; import org.apache.ignite.stream.*; @@ -59,16 +58,16 @@ public class StreamRandomNumbers { stmr.allowOverwrite(true); // Configure data transformation to count instances of the same word. - stmr.receiver(new StreamTransformer<>(new CacheEntryProcessor<Integer, Long, Object>() { - @Override - public Object process(MutableEntry<Integer, Long> e, Object... arg) { + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { Long val = e.getValue(); e.setValue(val == null ? 1L : val + 1); return null; } - })); + }); // Stream random numbers into the streamer cache. while (true) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java index 25ab99b..630f4e3 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java @@ -57,7 +57,7 @@ public class StreamRandomNumbers { stmr.allowOverwrite(true); // Configure data transformation to count instances of the same word. - stmr.receiver(new StreamTransformer<>((e, arg) -> { + stmr.receiver(StreamTransformer.from((e, arg) -> { Long val = e.getValue(); e.setValue(val == null ? 1L : val + 1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java index 8c85e5d..3f22299 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java @@ -27,25 +27,27 @@ import java.util.*; * Convenience adapter to transform update existing values in streaming cache * based on the previously cached value. */ -public class StreamTransformer<K, V> implements StreamReceiver<K, V> { +public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, EntryProcessor<K, V, Object> { /** */ private static final long serialVersionUID = 0L; - /** Entry processor. */ - private EntryProcessor<K, V, Object> ep; + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { + for (Map.Entry<K, V> entry : entries) + cache.invoke(entry.getKey(), this); + } /** - * Entry processor to update cache values based on the previously cached value. + * Creates a new transformer based on instance of {@link CacheEntryProcessor}. * * @param ep Entry processor. + * @return Stream transformer. */ - public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) { - this.ep = ep; - } - - /** {@inheritDoc} */ - @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { - for (Map.Entry<K, V> entry : entries) - cache.invoke(entry.getKey(), ep); + public static <K, V> StreamTransformer<K, V> from(final CacheEntryProcessor<K, V, Object> ep) { + return new StreamTransformer<K, V>() { + @Override public Object process(MutableEntry<K, V> entry, Object... args) throws EntryProcessorException { + return ep.process(entry, args); + } + }; } }