Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 9dde6c147 -> 629560086
# IGNITE-45 - Examples Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62956008 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62956008 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62956008 Branch: refs/heads/ignite-45 Commit: 629560086e9face071aeb3dc4f945dc779de8041 Parents: 9dde6c1 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Mar 21 13:43:00 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Mar 21 13:43:00 2015 -0700 ---------------------------------------------------------------------- .../streaming/marketdata/StreamMarketData.java | 8 ++--- .../streaming/marketdata/StreamMarketData.java | 2 +- .../org/apache/ignite/stream/StreamVisitor.java | 36 ++++++++++++-------- 3 files changed, 26 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java index 513a2e1..6d46f04 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java @@ -18,9 +18,7 @@ package org.apache.ignite.examples.java7.streaming.marketdata; import org.apache.ignite.*; -import org.apache.ignite.examples.streaming.numbers.ExamplesUtils; import org.apache.ignite.examples.java7.*; -import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; import java.util.*; @@ -64,9 +62,9 @@ public class StreamMarketData { try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. - mktStmr.receiver(new StreamVisitor<>(new IgniteBiInClosure<IgniteCache<String, MarketTick>, Map.Entry<String, MarketTick>>() { + mktStmr.receiver(new StreamVisitor<String, MarketTick>() { @Override - public void apply(IgniteCache<String, MarketTick> mktCache, Map.Entry<String, MarketTick> e) { + public void visit(IgniteCache<String, MarketTick> mktCache, Map.Entry<String, MarketTick> e) { String symbol = e.getKey(); MarketTick tick = e.getValue(); @@ -81,7 +79,7 @@ public class StreamMarketData { instCache.put(symbol, inst); } - })); + }); // Stream market data into market data stream cache. while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java index ccec740..ed2a25a 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -62,7 +62,7 @@ public class StreamMarketData { try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. - mktStmr.receiver(new StreamVisitor<>((cache, e) -> { + mktStmr.receiver(StreamVisitor.from((cache, e) -> { String symbol = e.getKey(); MarketTick tick = e.getValue(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java index 0474278..105607a 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java @@ -27,27 +27,35 @@ import java.util.*; * does not update the cache. If the tuple needs to be stored in the cache, * then {@code cache.put(...)} should be called explicitely. */ -public class StreamVisitor<K, V> implements StreamReceiver<K, V> { +public abstract class StreamVisitor<K, V> implements StreamReceiver<K, V> { /** */ private static final long serialVersionUID = 0L; - /** Tuple visitor. */ - private IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis; + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { + for (Map.Entry<K, V> entry : entries) + visit(cache, entry); + } /** - * Visitor to visit every stream key-value tuple. Note, that the visitor - * does not update the cache. If the tuple needs to be stored in the cache, - * then {@code cache.put(...)} should be called explicitely. + * Visits one cache entry. * - * @param vis Stream key-value tuple visitor. + * @param cache Cache. + * @param entry Visited entry. */ - public StreamVisitor(IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis) { - this.vis = vis; - } + protected abstract void visit(IgniteCache<K, V> cache, Map.Entry<K, V> entry); - /** {@inheritDoc} */ - @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { - for (Map.Entry<K, V> entry : entries) - vis.apply(cache, entry); + /** + * Creates a new visitor based on instance of {@link IgniteBiInClosure}. + * + * @param c Closure. + * @return Stream visitor. + */ + public static <K, V> StreamVisitor<K, V> from(final IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> c) { + return new StreamVisitor<K, V>() { + @Override protected void visit(IgniteCache<K, V> cache, Map.Entry<K, V> entry) { + c.apply(cache, entry); + } + }; } }