Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 4230e03a1 -> cc3c939d2


# IGNITE-45 - Examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cc3c939d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cc3c939d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cc3c939d

Branch: refs/heads/ignite-45
Commit: cc3c939d2978d54d254c5a0aa887529d63ef75c0
Parents: 4230e03
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sat Mar 21 21:03:35 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sat Mar 21 21:03:35 2015 -0700

----------------------------------------------------------------------
 .../streaming/numbers/StreamRandomNumbers.java  |  9 +++----
 .../streaming/numbers/StreamRandomNumbers.java  |  2 +-
 .../apache/ignite/stream/StreamTransformer.java | 26 +++++++++++---------
 3 files changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
index efb5f28..f1e55d2 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/numbers/StreamRandomNumbers.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.examples.streaming.numbers;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.examples.*;
 import org.apache.ignite.stream.*;
 
@@ -59,16 +58,16 @@ public class StreamRandomNumbers {
                 stmr.allowOverwrite(true);
 
                 // Configure data transformation to count instances of the 
same word.
-                stmr.receiver(new StreamTransformer<>(new 
CacheEntryProcessor<Integer, Long, Object>() {
-                    @Override
-                    public Object process(MutableEntry<Integer, Long> e, 
Object... arg) {
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
                         Long val = e.getValue();
 
                         e.setValue(val == null ? 1L : val + 1);
 
                         return null;
                     }
-                }));
+                });
 
                 // Stream random numbers into the streamer cache.
                 while (true)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
index 25ab99b..630f4e3 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
@@ -57,7 +57,7 @@ public class StreamRandomNumbers {
                 stmr.allowOverwrite(true);
 
                 // Configure data transformation to count instances of the 
same word.
-                stmr.receiver(new StreamTransformer<>((e, arg) -> {
+                stmr.receiver(StreamTransformer.from((e, arg) -> {
                     Long val = e.getValue();
 
                     e.setValue(val == null ? 1L : val + 1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cc3c939d/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
index 8c85e5d..3f22299 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
@@ -27,25 +27,27 @@ import java.util.*;
  * Convenience adapter to transform update existing values in streaming cache
  * based on the previously cached value.
  */
-public class StreamTransformer<K, V> implements StreamReceiver<K, V> {
+public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, 
EntryProcessor<K, V, Object> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Entry processor. */
-    private EntryProcessor<K, V, Object> ep;
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
+        for (Map.Entry<K, V> entry : entries)
+            cache.invoke(entry.getKey(), this);
+    }
 
     /**
-     * Entry processor to update cache values based on the previously cached 
value.
+     * Creates a new transformer based on instance of {@link 
CacheEntryProcessor}.
      *
      * @param ep Entry processor.
+     * @return Stream transformer.
      */
-    public StreamTransformer(CacheEntryProcessor<K, V, Object> ep) {
-        this.ep = ep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
-        for (Map.Entry<K, V> entry : entries)
-            cache.invoke(entry.getKey(), ep);
+    public static <K, V> StreamTransformer<K, V> from(final 
CacheEntryProcessor<K, V, Object> ep) {
+        return new StreamTransformer<K, V>() {
+            @Override public Object process(MutableEntry<K, V> entry, 
Object... args) throws EntryProcessorException {
+                return ep.process(entry, args);
+            }
+        };
     }
 }

Reply via email to