Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 9dde6c147 -> 629560086


# IGNITE-45 - Examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62956008
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62956008
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62956008

Branch: refs/heads/ignite-45
Commit: 629560086e9face071aeb3dc4f945dc779de8041
Parents: 9dde6c1
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sat Mar 21 13:43:00 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sat Mar 21 13:43:00 2015 -0700

----------------------------------------------------------------------
 .../streaming/marketdata/StreamMarketData.java  |  8 ++---
 .../streaming/marketdata/StreamMarketData.java  |  2 +-
 .../org/apache/ignite/stream/StreamVisitor.java | 36 ++++++++++++--------
 3 files changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
index 513a2e1..6d46f04 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.examples.java7.streaming.marketdata;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.streaming.numbers.ExamplesUtils;
 import org.apache.ignite.examples.java7.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.stream.*;
 
 import java.util.*;
@@ -64,9 +62,9 @@ public class StreamMarketData {
             try (IgniteDataStreamer<String, MarketTick> mktStmr = 
ignite.dataStreamer(mktCache.getName())) {
                 // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
                 // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(new StreamVisitor<>(new 
IgniteBiInClosure<IgniteCache<String, MarketTick>, Map.Entry<String, 
MarketTick>>() {
+                mktStmr.receiver(new StreamVisitor<String, MarketTick>() {
                     @Override
-                    public void apply(IgniteCache<String, MarketTick> 
mktCache, Map.Entry<String, MarketTick> e) {
+                    public void visit(IgniteCache<String, MarketTick> 
mktCache, Map.Entry<String, MarketTick> e) {
                         String symbol = e.getKey();
                         MarketTick tick = e.getValue();
 
@@ -81,7 +79,7 @@ public class StreamMarketData {
 
                         instCache.put(symbol, inst);
                     }
-                }));
+                });
 
                 // Stream market data into market data stream cache.
                 while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
index ccec740..ed2a25a 100644
--- 
a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
+++ 
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
@@ -62,7 +62,7 @@ public class StreamMarketData {
             try (IgniteDataStreamer<String, MarketTick> mktStmr = 
ignite.dataStreamer(mktCache.getName())) {
                 // Note that we receive market data, but do not populate 
'mktCache' (it remains empty).
                 // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(new StreamVisitor<>((cache, e) -> {
+                mktStmr.receiver(StreamVisitor.from((cache, e) -> {
                     String symbol = e.getKey();
                     MarketTick tick = e.getValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java 
b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
index 0474278..105607a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
@@ -27,27 +27,35 @@ import java.util.*;
  * does not update the cache. If the tuple needs to be stored in the cache,
  * then {@code cache.put(...)} should be called explicitely.
  */
-public class StreamVisitor<K, V> implements StreamReceiver<K, V> {
+public abstract class StreamVisitor<K, V> implements StreamReceiver<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Tuple visitor. */
-    private IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis;
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
+        for (Map.Entry<K, V> entry : entries)
+            visit(cache, entry);
+    }
 
     /**
-     * Visitor to visit every stream key-value tuple. Note, that the visitor
-     * does not update the cache. If the tuple needs to be stored in the cache,
-     * then {@code cache.put(...)} should be called explicitely.
+     * Visits one cache entry.
      *
-     * @param vis Stream key-value tuple visitor.
+     * @param cache Cache.
+     * @param entry Visited entry.
      */
-    public StreamVisitor(IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> 
vis) {
-        this.vis = vis;
-    }
+    protected abstract void visit(IgniteCache<K, V> cache, Map.Entry<K, V> 
entry);
 
-    /** {@inheritDoc} */
-    @Override public void receive(IgniteCache<K, V> cache, 
Collection<Map.Entry<K, V>> entries) throws IgniteException {
-        for (Map.Entry<K, V> entry : entries)
-            vis.apply(cache, entry);
+    /**
+     * Creates a new visitor based on instance of {@link IgniteBiInClosure}.
+     *
+     * @param c Closure.
+     * @return Stream visitor.
+     */
+    public static <K, V> StreamVisitor<K, V> from(final 
IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> c) {
+        return new StreamVisitor<K, V>() {
+            @Override protected void visit(IgniteCache<K, V> cache, 
Map.Entry<K, V> entry) {
+                c.apply(cache, entry);
+            }
+        };
     }
 }

Reply via email to