Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 7a68a0678 -> 9b3b2bdb0
# ignite-45 - copied streaming to java7. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b3b2bdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b3b2bdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b3b2bdb Branch: refs/heads/ignite-45 Commit: 9b3b2bdb0f4e070083f75bb0d0ea0d8bffbd8e14 Parents: 7a68a06 Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Sat Mar 21 12:13:00 2015 -0700 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Sat Mar 21 12:13:00 2015 -0700 ---------------------------------------------------------------------- .../streaming/marketdata/StreamMarketData.java | 23 ++++++++++++-------- .../streaming/marketdata/StreamMarketData.java | 1 + 2 files changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b3b2bdb/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java index 469f510..dd4c069 100644 --- a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java @@ -19,6 +19,7 @@ package org.apache.ignite.examples.java7.streaming.marketdata; import org.apache.ignite.*; import org.apache.ignite.examples.java8.*; +import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; import java.util.*; @@ -62,19 +63,23 @@ public class StreamMarketData { try (IgniteDataStreamer<String, MarketTick> stmr = ignite.dataStreamer(mktCache.getName())) { // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. - stmr.receiver(new StreamVisitor<>((cache, e) -> { - String symbol = e.getKey(); - MarketTick tick = e.getValue(); + stmr.receiver(new StreamVisitor<>(new IgniteBiInClosure<IgniteCache<String, MarketTick>, Map.Entry<String, MarketTick>>() { + @Override + public void apply(IgniteCache<String, MarketTick> mktCache, Map.Entry<String, MarketTick> e) { + String symbol = e.getKey(); + MarketTick tick = e.getValue(); - Instrument inst = instCache.get(symbol); + Instrument inst = instCache.get(symbol); - if (inst == null) - inst = new Instrument(symbol); + if (inst == null) + inst = new Instrument(symbol); - // Update cached instrument based on the latest market tick. - inst.update(tick); + // Don't populate market cache, as we don't use it for querying. + // Update cached instrument based on the latest market tick. + inst.update(tick); - instCache.put(symbol, inst); + instCache.put(symbol, inst); + } })); // Stream market data into market data stream cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b3b2bdb/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java index 104ed17..5b7f91f 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java @@ -71,6 +71,7 @@ public class StreamMarketData { if (inst == null) inst = new Instrument(symbol); + // Don't populate market cache, as we don't use it for querying. // Update cached instrument based on the latest market tick. inst.update(tick);